• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13974489001

20 Mar 2025 04:32PM UTC coverage: 56.292% (-2.9%) from 59.168%
13974489001

Pull #8754

github

web-flow
Merge aed149e6b into ea050d06f
Pull Request #8754: Add `Outbound` Remote Signer implementation

594 of 1713 new or added lines in 26 files covered. (34.68%)

23052 existing lines in 272 files now uncovered.

105921 of 188165 relevant lines covered (56.29%)

23796.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.51
/lnwallet/rpcwallet/remote_signer_client.go
1
package rpcwallet
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "os"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/lightningnetwork/lnd/fn/v2"
14
        "github.com/lightningnetwork/lnd/lncfg"
15
        "github.com/lightningnetwork/lnd/lnrpc"
16
        "github.com/lightningnetwork/lnd/lnrpc/signrpc"
17
        "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
18
        "github.com/lightningnetwork/lnd/macaroons"
19
        "google.golang.org/grpc"
20
        "google.golang.org/grpc/credentials"
21
        "google.golang.org/protobuf/reflect/protoreflect"
22
        "gopkg.in/macaroon.v2"
23
)
24

25
type (
26
        signerResponse = walletrpc.SignCoordinatorResponse
27

28
        // registrationResp is a type alias for the registration response type,
29
        // created to keep line length within 80 characters.
30
        registrationResp = walletrpc.SignCoordinatorRequest_RegistrationResponse
31

32
        // completeType is a type alias for the registration complete type,
33
        // created to keep line length within 80 characters.
34
        completeType = walletrpc.RegistrationResponse_RegistrationComplete
35
)
36

37
// registrationMsg is the message that we send to the watch-only node to
38
// initiate the handshake process.
39
// TODO(viktor): This could be extended to include info about the
40
// version of the remote signer in the future.
41
// The RegistrationChallenge should also be set to a randomized string.
42
var registrationMsg = &walletrpc.SignCoordinatorResponse{
43
        RefRequestId: handshakeRequestID,
44
        SignResponseType: &walletrpc.SignCoordinatorResponse_SignerRegistration{
45
                SignerRegistration: &walletrpc.SignerRegistration{
46
                        RegistrationChallenge: "registrationChallenge",
47
                        RegistrationInfo:      "outboundSigner",
48
                },
49
        },
50
}
51

52
var (
53
        // ErrShuttingDown indicates that the server is in the process of
54
        // gracefully exiting.
55
        ErrShuttingDown = errors.New("lnd is shutting down")
56

57
        // ErrRequestType is returned when the request type by the watch-only
58
        // node has not been implemented by remote signer.
59
        ErrRequestType = errors.New("unimplemented request by watch-only node")
60
)
61

62
const (
63
        // defaultRetryTimeout is the default timeout used when retrying to
64
        // connect to the watch-only node.
65
        defaultRetryTimeout = time.Second * 1
66

67
        // retryMultiplier is the multiplier used to increase the retry timeout
68
        // for every retry.
69
        retryMultiplier = 1.5
70

71
        // defaultMaxRetryTimeout is the default max value for the
72
        // maxRetryTimeout, which defines the maximum backoff period before
73
        // attempting to reconnect to the watch-only node.
74
        defaultMaxRetryTimeout = time.Minute * 1
75

76
        // handshakeRequestID is the request ID that is reversed for the
77
        // handshake with the watch-only node.
78
        handshakeRequestID = uint64(1)
79
)
80

81
// Stream represents the stream to the watch-only node with a Close function
82
// that closes the connection.
83
type Stream struct {
84
        StreamClient
85

86
        // Close closes the connection to the watch-only node.
87
        Close func() error
88
}
89

90
// NewStream creates a new Stream instance.
91
func NewStream(client StreamClient, closeConn func() error) *Stream {
7✔
92
        return &Stream{
7✔
93
                StreamClient: client,
7✔
94
                Close:        closeConn,
7✔
95
        }
7✔
96
}
7✔
97

98
// SignCoordinatorStreamFeeder is an interface that returns a newly created
99
// stream to the watch-only node. The stream is used to send and receive
100
// messages between the remote signer client and the watch-only node.
101
type SignCoordinatorStreamFeeder interface {
102
        // GetStream returns a new stream to the watch-only node. The function
103
        // also returns a cleanup function that should be called when the stream
104
        // is no longer needed.
105
        GetStream(ctx context.Context) (*Stream, error)
106

107
        // Stop stops the stream feeder.
108
        Stop()
109
}
110

111
// RemoteSignerClient is an interface that defines the methods that a remote
112
// signer client should implement.
113
type RemoteSignerClient interface {
114
        // Start starts the remote signer client.
115
        Start(ctx context.Context) error
116

117
        // Stop stops the remote signer client.
118
        Stop() error
119

120
        // MustImplementRemoteSignerClient is a no-op method that makes it
121
        // easier to filter structs that implement the RemoteSignerClient
122
        // interface.
123
        MustImplementRemoteSignerClient()
124
}
125

126
// StreamFeeder is an implementation of the SignCoordinatorStreamFeeder
127
// interface that creates a new stream to the watch-only node, by making an
128
// outbound gRPC connection to the watch-only node.
129
type StreamFeeder struct {
130
        wg sync.WaitGroup
131

132
        cfg lncfg.ConnectionCfg
133

134
        cg *fn.ContextGuard
135
}
136

137
// NewStreamFeeder creates a new StreamFeeder instance.
NEW
138
func NewStreamFeeder(cfg lncfg.ConnectionCfg) *StreamFeeder {
×
NEW
139
        return &StreamFeeder{
×
NEW
140
                cfg: cfg,
×
NEW
141
                cg:  fn.NewContextGuard(),
×
NEW
142
        }
×
NEW
143
}
×
144

145
// Stop stops the StreamFeeder and disables the StreamFeeder from creating any
146
// new connections.
147
//
148
// NOTE: This is part of the SignCoordinatorStreamFeeder interface.
NEW
149
func (s *StreamFeeder) Stop() {
×
NEW
150
        s.cg.Quit()
×
NEW
151

×
NEW
152
        s.wg.Wait()
×
NEW
153
}
×
154

155
// GetStream returns a new stream to the watch-only node, by making an
156
// outbound gRPC connection to the watch-only node. The function also returns a
157
// cleanup function that closes the connection, which should be called when the
158
// stream is no longer needed.
159
//
160
// NOTE: This is part of the SignCoordinatorStreamFeeder interface.
NEW
161
func (s *StreamFeeder) GetStream(ctx context.Context) (*Stream, error) {
×
NEW
162
        select {
×
163
        // Don't run if the StreamFeeder has already been shutdown.
NEW
164
        case <-s.cg.Done():
×
NEW
165
                return nil, ErrShuttingDown
×
NEW
166
        default:
×
167
        }
168

169
        // Create a new outbound gRPC connection to the watch-only node.
NEW
170
        conn, err := s.getClientConn(ctx)
×
NEW
171
        if err != nil {
×
NEW
172
                return nil, err
×
NEW
173
        }
×
174

175
        // Wrap the connection in a WalletKitClient.
NEW
176
        walletKitClient := walletrpc.NewWalletKitClient(conn)
×
NEW
177

×
NEW
178
        // Create a new stream to the watch-only node.
×
NEW
179
        streamClient, err := walletKitClient.SignCoordinatorStreams(ctx)
×
NEW
180
        if err != nil {
×
NEW
181
                connErr := conn.Close()
×
NEW
182
                if connErr != nil {
×
NEW
183
                        log.ErrorS(ctx, "Unable to close watch-only node "+
×
NEW
184
                                "connection: %v", connErr)
×
NEW
185
                }
×
186

NEW
187
                return nil, err
×
188
        }
189

NEW
190
        return NewStream(streamClient, conn.Close), nil
×
191
}
192

193
// getClientConn creates a new outbound gRPC connection to the watch-only node.
194
func (s *StreamFeeder) getClientConn(
NEW
195
        ctx context.Context) (*grpc.ClientConn, error) {
×
NEW
196

×
NEW
197
        // Ensure that our top level ctx is derived from the context guard.
×
NEW
198
        // That way we know that we only need to select on the context guard's
×
NEW
199
        // Done channel if the remote signer client is shutting down.
×
NEW
200
        // If we fail to connect to the watch-only node within the
×
NEW
201
        // configured timeout we should return an error.
×
NEW
202
        ctx, cancel := s.cg.Create(ctx, fn.WithCustomTimeoutCG(s.cfg.Timeout))
×
NEW
203
        defer cancel()
×
NEW
204

×
NEW
205
        // Load the specified macaroon file for the watch-only node.
×
NEW
206
        macBytes, err := os.ReadFile(s.cfg.MacaroonPath)
×
NEW
207
        if err != nil {
×
NEW
208
                return nil, fmt.Errorf("could not read macaroon file: %w", err)
×
NEW
209
        }
×
210

NEW
211
        mac := &macaroon.Macaroon{}
×
NEW
212

×
NEW
213
        err = mac.UnmarshalBinary(macBytes)
×
NEW
214
        if err != nil {
×
NEW
215
                return nil, fmt.Errorf("could not unmarshal macaroon: %w", err)
×
NEW
216
        }
×
217

NEW
218
        macCred, err := macaroons.NewMacaroonCredential(mac)
×
NEW
219
        if err != nil {
×
NEW
220
                return nil, fmt.Errorf(
×
NEW
221
                        "could not create macaroon credential: %w", err)
×
NEW
222
        }
×
223

224
        // Load the specified TLS cert for the watch-only node.
NEW
225
        tlsCreds, err := credentials.NewClientTLSFromFile(s.cfg.TLSCertPath, "")
×
NEW
226
        if err != nil {
×
NEW
227
                return nil, fmt.Errorf("could not load TLS cert: %w", err)
×
NEW
228
        }
×
229

NEW
230
        opts := []grpc.DialOption{
×
NEW
231
                grpc.WithBlock(),
×
NEW
232
                grpc.WithTransportCredentials(tlsCreds),
×
NEW
233
                grpc.WithPerRPCCredentials(macCred),
×
NEW
234
        }
×
NEW
235

×
NEW
236
        log.InfoS(ctx, "Attempting to connect to the watch-only node on: %s",
×
NEW
237
                s.cfg.RPCHost)
×
NEW
238

×
NEW
239
        // Connect to the watch-only node using the new context.
×
NEW
240
        return grpc.DialContext(ctx, s.cfg.RPCHost, opts...)
×
241
}
242

243
// A compile time assertion to ensure StreamFeeder meets the
244
// SignCoordinatorStreamFeeder interface.
245
var _ SignCoordinatorStreamFeeder = (*StreamFeeder)(nil)
246

247
// NoOpClient is a remote signer client that is a no op, and is used when the
248
// configuration doesn't enable the use of a remote signer client.
249
type NoOpClient struct{}
250

251
// Start is a no-op.
252
//
253
// NOTE: Part of the RemoteSignerClient interface.
NEW
254
func (n *NoOpClient) Start(ctx context.Context) error {
×
NEW
255
        return nil
×
NEW
256
}
×
257

258
// Stop is a no-op.
259
//
260
// NOTE: Part of the RemoteSignerClient interface.
NEW
261
func (n *NoOpClient) Stop() error {
×
NEW
262
        return nil
×
NEW
263
}
×
264

265
// MustImplementRemoteSignerClient is a no-op.
266
//
267
// NOTE: Part of the RemoteSignerClient interface.
NEW
268
func (n *NoOpClient) MustImplementRemoteSignerClient() {}
×
269

270
// A compile time assertion to ensure NoOpClient meets the
271
// RemoteSignerClient interface.
272
var _ RemoteSignerClient = (*NoOpClient)(nil)
273

274
// OutboundClient is a remote signer client which will process and respond to
275
// sign requests from the watch-only node, which are sent over a stream between
276
// the node and a watch-only node.
277
type OutboundClient struct {
278
        stopped atomic.Bool
279

280
        log btclog.Logger
281

282
        // walletServer is the WalletKitServer that the remote signer client
283
        // will use to process walletrpc requests.
284
        walletServer walletrpc.WalletKitServer
285

286
        // signerServer is the SignerServer that the remote signer client will
287
        // use to process signrpc requests.
288
        signerServer signrpc.SignerServer
289

290
        // streamFeeder is the stream feeder that will set up a stream to the
291
        // watch-only node when requested to do so by the remote signer client.
292
        streamFeeder SignCoordinatorStreamFeeder
293

294
        // requestTimeout is the timeout used when sending responses to the
295
        // watch-only node.
296
        requestTimeout time.Duration
297

298
        // retryTimeout is the backoff timeout used when retrying to set up a
299
        // connection to the watch-only node, if the previous connection/attempt
300
        // failed.
301
        retryTimeout time.Duration
302

303
        // maxRetryTimeout is the max value for the retryTimeout, defining
304
        // the maximum backoff period before attempting to reconnect to the
305
        // watch-only node.
306
        maxRetryTimeout time.Duration
307

308
        cg       *fn.ContextGuard
309
        gManager *fn.GoroutineManager
310
}
311

312
// NewOutboundClient creates a new instance of the remote signer client.
313
// The passed subServers need to include a walletrpc.WalletKitServer and a
314
// signrpc.SignerServer, or the OutboundClient will be disabled.
315
// Note that the client will only fully start if the configuration
316
// enables an outbound remote signer.
317
func NewOutboundClient(walletServer walletrpc.WalletKitServer,
318
        signerServer signrpc.SignerServer,
319
        streamFeeder SignCoordinatorStreamFeeder,
320
        requestTimeout time.Duration) (*OutboundClient, error) {
5✔
321

5✔
322
        if walletServer == nil || signerServer == nil {
5✔
NEW
323
                return nil, errors.New("sub-servers cannot be nil when using " +
×
NEW
324
                        "an outbound remote signer")
×
NEW
325
        }
×
326

327
        if streamFeeder == nil {
5✔
NEW
328
                return nil, errors.New("streamFeeder cannot be nil")
×
NEW
329
        }
×
330

331
        return &OutboundClient{
5✔
332
                log:             log.WithPrefix("Remote signer client: "),
5✔
333
                walletServer:    walletServer,
5✔
334
                signerServer:    signerServer,
5✔
335
                streamFeeder:    streamFeeder,
5✔
336
                requestTimeout:  requestTimeout,
5✔
337
                retryTimeout:    defaultRetryTimeout,
5✔
338
                maxRetryTimeout: defaultMaxRetryTimeout,
5✔
339
                cg:              fn.NewContextGuard(),
5✔
340
                gManager:        fn.NewGoroutineManager(),
5✔
341
        }, nil
5✔
342
}
343

344
// Start starts the remote signer client. The function will continuously try to
345
// set up a connection to the configured watch-only node, and retry to connect
346
// if the connection fails until we Stop the remote signer client.
347
//
348
// NOTE: Part of the RemoteSignerClient interface.
349
func (r *OutboundClient) Start(ctx context.Context) error {
5✔
350
        // Ensure that our top level ctx is derived from the context guard.
5✔
351
        // That way we know that we only need to select on the context guard's
5✔
352
        // Done channel if the remote signer client is shutting down.
5✔
353
        ctx, _ = r.cg.Create(ctx)
5✔
354

5✔
355
        success := r.gManager.Go(ctx, r.runForever)
5✔
356
        if !success {
5✔
NEW
357
                return errors.New("failed to start remote signer client")
×
NEW
358
        }
×
359

360
        return nil
5✔
361
}
362

363
// runForever continuously tries to set up a connection to the watch-only node,
364
// and retry to connect if the connection fails until we Stop the remote
365
// signer client.
366
func (r *OutboundClient) runForever(ctx context.Context) {
5✔
367
        for {
16✔
368
                // Check if we are shutting down.
11✔
369
                select {
11✔
NEW
370
                case <-ctx.Done():
×
NEW
371
                        return
×
372
                default:
11✔
373
                }
374

375
                err := r.runOnce(ctx)
11✔
376
                if err != nil {
22✔
377
                        r.log.ErrorS(ctx, "runOnce error", err)
11✔
378
                }
11✔
379

380
                r.log.InfoS(
11✔
381
                        ctx,
11✔
382
                        "Connection retry to watch-only node scheduled",
11✔
383
                        "retry_after", r.retryTimeout,
11✔
384
                )
11✔
385

11✔
386
                // Backoff before retrying to connect to the watch-only node.
11✔
387
                select {
11✔
388
                case <-ctx.Done():
5✔
389
                        return
5✔
390
                case <-time.After(r.retryTimeout):
6✔
391
                }
392

393
                r.log.InfoS(ctx, "Retrying to connect to watch-only node")
6✔
394

6✔
395
                // Increase the retry timeout by 50% for every retry.
6✔
396
                r.retryTimeout = time.Duration(
6✔
397
                        float64(r.retryTimeout) * retryMultiplier,
6✔
398
                )
6✔
399

6✔
400
                // But cap the retryTimeout at r.maxRetryTimeout
6✔
401
                if r.retryTimeout > r.maxRetryTimeout {
8✔
402
                        r.retryTimeout = r.maxRetryTimeout
2✔
403
                }
2✔
404
        }
405
}
406

407
// Stop stops the remote signer client.
408
//
409
// NOTE: Part of the RemoteSignerClient interface.
410
func (r *OutboundClient) Stop() error {
5✔
411
        if r.stopped.Swap(true) {
5✔
NEW
412
                return errors.New("remote signer client is already shut down")
×
NEW
413
        }
×
414

415
        r.log.Info("Shutting down")
5✔
416

5✔
417
        r.cg.Quit()
5✔
418

5✔
419
        r.streamFeeder.Stop()
5✔
420

5✔
421
        r.gManager.Stop()
5✔
422

5✔
423
        r.log.Debugf("Shutdown complete")
5✔
424

5✔
425
        return nil
5✔
426
}
427

428
// MustImplementRemoteSignerClient is a no-op.
429
//
430
// NOTE: Part of the RemoteSignerClient interface.
NEW
431
func (r *OutboundClient) MustImplementRemoteSignerClient() {}
×
432

433
// runOnce creates a new stream to the watch-only node, and starts processing
434
// and responding to the sign requests that are sent over the stream. The
435
// function will continuously run until the remote signer client is either
436
// stopped or the stream errors.
437
func (r *OutboundClient) runOnce(ctx context.Context) error {
11✔
438
        // Derive a context for the lifetime of the stream.
11✔
439
        ctx, cancel := r.cg.Create(ctx)
11✔
440

11✔
441
        // Cancel the stream context whenever we return from this function.
11✔
442
        defer cancel()
11✔
443

11✔
444
        log.InfoS(ctx, "Attempting to setup the watch-only node connection")
11✔
445

11✔
446
        // Try to get a new stream to the watch-only node.
11✔
447
        stream, err := r.streamFeeder.GetStream(ctx)
11✔
448
        if err != nil {
15✔
449
                return err
4✔
450
        }
4✔
451
        defer func() {
14✔
452
                err := stream.Close()
7✔
453
                if err != nil {
7✔
NEW
454
                        log.ErrorS(ctx, "Unable to close watch-only node "+
×
NEW
455
                                "connection", err)
×
NEW
456
                }
×
457
        }()
458

459
        // Once the stream has been created, we'll need to perform the handshake
460
        // process with the watch-only node, before it will start sending us
461
        // requests.
462
        err = r.handshake(ctx, stream)
7✔
463
        if err != nil {
9✔
464
                return err
2✔
465
        }
2✔
466

467
        log.InfoS(ctx, "Completed setup connection to watch-only node")
5✔
468

5✔
469
        // Reset the retry timeout after a successful connection.
5✔
470
        r.retryTimeout = defaultRetryTimeout
5✔
471

5✔
472
        return r.processSignRequestsForever(ctx, stream)
5✔
473
}
474

475
// handshake performs the handshake process with the watch-only node. As we are
476
// the initiator of the stream, we need to send the first message over the
477
// stream. The watch-only node will only proceed to sending us requests after
478
// the handshake has been completed.
479
func (r *OutboundClient) handshake(ctx context.Context, stream *Stream) error {
7✔
480
        // Derive a context that times out the handshake process, if it takes
7✔
481
        // longer than the request timeout.
7✔
482
        ctxt, cancel := context.WithTimeout(ctx, r.requestTimeout)
7✔
483
        defer cancel()
7✔
484

7✔
485
        var (
7✔
486
                msg     *walletrpc.SignCoordinatorRequest
7✔
487
                errChan = make(chan error, 1)
7✔
488
        )
7✔
489

7✔
490
        ok := r.gManager.Go(ctxt, func(_ context.Context) {
14✔
491
                // Send the registration message to the watch-only node.
7✔
492
                err := stream.Send(registrationMsg)
7✔
493
                if err != nil {
9✔
494
                        errChan <- err
2✔
495
                        return
2✔
496
                }
2✔
497

498
                // After the registration message has been sent, the signer node
499
                // will respond with a message indicating that it has accepted
500
                // the signer registration request if the registration was
501
                // successful.
502
                msg, err = stream.Recv()
5✔
503
                errChan <- err
5✔
504
        })
505
        if !ok {
7✔
NEW
506
                return fmt.Errorf("error sending registration message")
×
NEW
507
        }
×
508

509
        // Wait for the response.
510
        select {
7✔
511
        case <-ctxt.Done():
2✔
512
                return ctxt.Err()
2✔
513
        case err := <-errChan:
5✔
514
                if err != nil {
5✔
NEW
515
                        return fmt.Errorf("handshake error: %w", err)
×
NEW
516
                }
×
517
        }
518

519
        // Verify that the request ID of the response is the same as the
520
        // request ID of the registration message.
521
        if msg.GetRequestId() != handshakeRequestID {
5✔
NEW
522
                return fmt.Errorf("initial response request id must "+
×
NEW
523
                        "be %d, but is: %d", handshakeRequestID,
×
NEW
524
                        msg.GetRequestId())
×
NEW
525
        }
×
526

527
        // Check the type of the response message.
528
        resp, ok := msg.GetSignRequestType().(*registrationResp)
5✔
529
        if !ok {
5✔
NEW
530
                return fmt.Errorf("expected registration response, but got: %T",
×
NEW
531
                        msg.GetSignRequestType())
×
NEW
532
        }
×
533

534
        switch rType := resp.RegistrationResponse.
5✔
535
                GetRegistrationResponseType().(type) {
5✔
536
        // The registration was successful.
537
        case *completeType:
5✔
538
                // TODO(viktor): This should verify that the signature in the
5✔
539
                // complete message is valid.
5✔
540
                return nil
5✔
541

542
        // An error occurred during the registration process.
NEW
543
        case *walletrpc.RegistrationResponse_RegistrationError:
×
NEW
544
                return fmt.Errorf("registration error: %s",
×
NEW
545
                        rType.RegistrationError)
×
546

NEW
547
        default:
×
NEW
548
                return fmt.Errorf("unknown registration response type: %T",
×
NEW
549
                        resp.RegistrationResponse.GetRegistrationResponseType())
×
550
        }
551
}
552

553
// processSignRequestsForever processes and responds to the sign requests tha
554
// are sent over the stream. The function will continuously run until the
555
// remote signer client is either stopped or the stream errors.
556
func (r *OutboundClient) processSignRequestsForever(ctx context.Context,
557
        stream *Stream) error {
5✔
558

5✔
559
        for {
13✔
560
                err := r.processSingleSignReq(ctx, stream)
8✔
561
                if err != nil {
13✔
562
                        return err
5✔
563
                }
5✔
564

565
                select {
3✔
NEW
566
                case <-ctx.Done():
×
NEW
567
                        return ErrShuttingDown
×
568
                default:
3✔
569
                }
570
        }
571
}
572

573
// processSingleSignReq waits for and processes a single request from the
574
// watch-only node, and sends the corresponding response back.
575
func (r *OutboundClient) processSingleSignReq(ctx context.Context,
576
        stream *Stream) error {
8✔
577

8✔
578
        // Wait for a request from the watch-only node.
8✔
579
        req, err := r.waitForRequest(ctx, stream)
8✔
580
        if err != nil {
12✔
581
                return err
4✔
582
        }
4✔
583

584
        // Process the received request.
585
        resp := r.formResponse(ctx, req)
4✔
586

4✔
587
        // Send the response back to the watch-only node.
4✔
588
        return r.sendResponse(ctx, resp, stream)
4✔
589
}
590

591
// waitForRequest waits for a request from the watch-only node.
592
func (r *OutboundClient) waitForRequest(ctx context.Context, stream *Stream) (
593
        *walletrpc.SignCoordinatorRequest, error) {
8✔
594

8✔
595
        var (
8✔
596
                req     *walletrpc.SignCoordinatorRequest
8✔
597
                err     error
8✔
598
                errChan = make(chan error, 1)
8✔
599
        )
8✔
600

8✔
601
        // We run the stream.Recv() in a goroutine to ensure we can stop if the
8✔
602
        // remote signer client is shutting down (i.e. the quit channel is
8✔
603
        // closed). Shutting down the remote signer client will cancel the ctx,
8✔
604
        // which will cancel the stream context, which in turn will stop the
8✔
605
        // goroutine.
8✔
606
        ok := r.gManager.Go(ctx, func(_ context.Context) {
16✔
607
                req, err = stream.Recv()
8✔
608
                errChan <- err
8✔
609
        })
8✔
610
        if !ok {
8✔
NEW
611
                return nil, fmt.Errorf("error receiving request")
×
NEW
612
        }
×
613

614
        // Wait for the response and then handle it.
615
        select {
8✔
616
        case <-ctx.Done():
2✔
617
                return nil, ctx.Err()
2✔
618
        case err := <-errChan:
6✔
619
                if err != nil {
8✔
620
                        return nil, fmt.Errorf("error receiving request: %w",
2✔
621
                                err)
2✔
622
                }
2✔
623
        }
624

625
        return req, nil
4✔
626
}
627

628
// formResponse processes the received request from the watch-only node, and
629
// sends the corresponding response back.
630
func (r *OutboundClient) formResponse(ctx context.Context,
631
        req *walletrpc.SignCoordinatorRequest) *signerResponse {
4✔
632

4✔
633
        resp, err := r.process(ctx, req)
4✔
634
        if err != nil {
5✔
635
                r.log.ErrorS(ctx, "could not process request", err)
1✔
636

1✔
637
                // If we fail to process the request, we will send a SignerError
1✔
638
                // back to the watch-only node, indicating the nature of the
1✔
639
                // error.
1✔
640
                eType := &walletrpc.SignCoordinatorResponse_SignerError{
1✔
641
                        SignerError: &walletrpc.SignerError{
1✔
642
                                Error: "error processing the request in the " +
1✔
643
                                        "remote signer: " + err.Error(),
1✔
644
                        },
1✔
645
                }
1✔
646

1✔
647
                resp = &signerResponse{
1✔
648
                        RefRequestId:     req.GetRequestId(),
1✔
649
                        SignResponseType: eType,
1✔
650
                }
1✔
651
        }
1✔
652

653
        return resp
4✔
654
}
655

656
// process sends the passed request on to the appropriate server for processing
657
// it, and returns the response.
658
func (r *OutboundClient) process(ctx context.Context,
659
        req *walletrpc.SignCoordinatorRequest) (*signerResponse, error) {
4✔
660

4✔
661
        r.log.DebugS(ctx, "Processing a request from watch-only",
4✔
662
                btclog.Fmt("request_type", "%T", req.GetSignRequestType()))
4✔
663

4✔
664
        r.log.TraceS(ctx, "Request content",
4✔
665
                "content", formatSignCoordinatorMsg(req))
4✔
666

4✔
667
        var (
4✔
668
                requestID = req.GetRequestId()
4✔
669
                signResp  = &signerResponse{
4✔
670
                        RefRequestId: requestID,
4✔
671
                }
4✔
672
        )
4✔
673

4✔
674
        //nolint:ll
4✔
675
        switch reqType := req.GetSignRequestType().(type) {
4✔
NEW
676
        case *walletrpc.SignCoordinatorRequest_SharedKeyRequest:
×
NEW
677
                resp, err := r.signerServer.DeriveSharedKey(
×
NEW
678
                        ctx, reqType.SharedKeyRequest,
×
NEW
679
                )
×
NEW
680
                if err != nil {
×
NEW
681
                        return nil, err
×
NEW
682
                }
×
683

NEW
684
                rType := &walletrpc.SignCoordinatorResponse_SharedKeyResponse{
×
NEW
685
                        SharedKeyResponse: resp,
×
NEW
686
                }
×
NEW
687

×
NEW
688
                signResp.SignResponseType = rType
×
NEW
689

×
NEW
690
                return signResp, nil
×
691

692
        case *walletrpc.SignCoordinatorRequest_SignMessageReq:
1✔
693
                resp, err := r.signerServer.SignMessage(
1✔
694
                        ctx, reqType.SignMessageReq,
1✔
695
                )
1✔
696
                if err != nil {
2✔
697
                        return nil, err
1✔
698
                }
1✔
699

NEW
700
                rType := &walletrpc.SignCoordinatorResponse_SignMessageResp{
×
NEW
701
                        SignMessageResp: resp,
×
NEW
702
                }
×
NEW
703

×
NEW
704
                signResp.SignResponseType = rType
×
NEW
705

×
NEW
706
                return signResp, nil
×
707

NEW
708
        case *walletrpc.SignCoordinatorRequest_MuSig2SessionRequest:
×
NEW
709
                resp, err := r.signerServer.MuSig2CreateSession(
×
NEW
710
                        ctx, reqType.MuSig2SessionRequest,
×
NEW
711
                )
×
NEW
712
                if err != nil {
×
NEW
713
                        return nil, err
×
NEW
714
                }
×
715

NEW
716
                rType := &walletrpc.SignCoordinatorResponse_MuSig2SessionResponse{
×
NEW
717
                        MuSig2SessionResponse: resp,
×
NEW
718
                }
×
NEW
719

×
NEW
720
                signResp.SignResponseType = rType
×
NEW
721

×
NEW
722
                return signResp, nil
×
723

NEW
724
        case *walletrpc.SignCoordinatorRequest_MuSig2RegisterNoncesRequest:
×
NEW
725
                resp, err := r.signerServer.MuSig2RegisterNonces(
×
NEW
726
                        ctx, reqType.MuSig2RegisterNoncesRequest,
×
NEW
727
                )
×
NEW
728
                if err != nil {
×
NEW
729
                        return nil, err
×
NEW
730
                }
×
731

NEW
732
                rType := &walletrpc.SignCoordinatorResponse_MuSig2RegisterNoncesResponse{
×
NEW
733
                        MuSig2RegisterNoncesResponse: resp,
×
NEW
734
                }
×
NEW
735

×
NEW
736
                signResp.SignResponseType = rType
×
NEW
737

×
NEW
738
                return signResp, nil
×
739

NEW
740
        case *walletrpc.SignCoordinatorRequest_MuSig2SignRequest:
×
NEW
741
                resp, err := r.signerServer.MuSig2Sign(
×
NEW
742
                        ctx, reqType.MuSig2SignRequest,
×
NEW
743
                )
×
NEW
744
                if err != nil {
×
NEW
745
                        return nil, err
×
NEW
746
                }
×
747

NEW
748
                rType := &walletrpc.SignCoordinatorResponse_MuSig2SignResponse{
×
NEW
749
                        MuSig2SignResponse: resp,
×
NEW
750
                }
×
NEW
751

×
NEW
752
                signResp.SignResponseType = rType
×
NEW
753

×
NEW
754
                return signResp, nil
×
755

NEW
756
        case *walletrpc.SignCoordinatorRequest_MuSig2CombineSigRequest:
×
NEW
757
                resp, err := r.signerServer.MuSig2CombineSig(
×
NEW
758
                        ctx, reqType.MuSig2CombineSigRequest,
×
NEW
759
                )
×
NEW
760
                if err != nil {
×
NEW
761
                        return nil, err
×
NEW
762
                }
×
763

NEW
764
                rType := &walletrpc.SignCoordinatorResponse_MuSig2CombineSigResponse{
×
NEW
765
                        MuSig2CombineSigResponse: resp,
×
NEW
766
                }
×
NEW
767

×
NEW
768
                signResp.SignResponseType = rType
×
NEW
769

×
NEW
770
                return signResp, nil
×
771

NEW
772
        case *walletrpc.SignCoordinatorRequest_MuSig2CleanupRequest:
×
NEW
773
                resp, err := r.signerServer.MuSig2Cleanup(
×
NEW
774
                        ctx, reqType.MuSig2CleanupRequest,
×
NEW
775
                )
×
NEW
776
                if err != nil {
×
NEW
777
                        return nil, err
×
NEW
778
                }
×
779

NEW
780
                rType := &walletrpc.SignCoordinatorResponse_MuSig2CleanupResponse{
×
NEW
781
                        MuSig2CleanupResponse: resp,
×
NEW
782
                }
×
NEW
783

×
NEW
784
                signResp.SignResponseType = rType
×
NEW
785

×
NEW
786
                return signResp, nil
×
787

NEW
788
        case *walletrpc.SignCoordinatorRequest_SignPsbtRequest:
×
NEW
789
                resp, err := r.walletServer.SignPsbt(
×
NEW
790
                        ctx, reqType.SignPsbtRequest,
×
NEW
791
                )
×
NEW
792
                if err != nil {
×
NEW
793
                        return nil, err
×
NEW
794
                }
×
795

NEW
796
                rType := &walletrpc.SignCoordinatorResponse_SignPsbtResponse{
×
NEW
797
                        SignPsbtResponse: resp,
×
NEW
798
                }
×
NEW
799

×
NEW
800
                signResp.SignResponseType = rType
×
NEW
801

×
NEW
802
                return signResp, nil
×
803

804
        case *walletrpc.SignCoordinatorRequest_Ping:
3✔
805
                // If the received request is a ping, we don't need to pass the
3✔
806
                // request on to a server, but can respond with a pong directly.
3✔
807
                rType := &walletrpc.SignCoordinatorResponse_Pong{
3✔
808
                        Pong: true,
3✔
809
                }
3✔
810

3✔
811
                signResp.SignResponseType = rType
3✔
812

3✔
813
                return signResp, nil
3✔
814

NEW
815
        default:
×
NEW
816
                return nil, ErrRequestType
×
817
        }
818
}
819

820
// sendResponse sends the passed response back to the watch-only node over the
821
// stream.
822
func (r *OutboundClient) sendResponse(ctx context.Context, resp *signerResponse,
823
        stream *Stream) error {
4✔
824

4✔
825
        // Timeout sending the response if it takes too long.
4✔
826
        ctxt, cancel := r.cg.Create(
4✔
827
                ctx, fn.WithCustomTimeoutCG(r.requestTimeout),
4✔
828
        )
4✔
829
        defer cancel()
4✔
830

4✔
831
        var errChan = make(chan error, 1)
4✔
832

4✔
833
        // We send the response in a goroutine to ensure we can return an error
4✔
834
        // if the send times out or we shut down. This is done to ensure that
4✔
835
        // this function won't block indefinitely.
4✔
836
        ok := r.gManager.Go(ctxt, func(ctxt context.Context) {
8✔
837
                errChan <- stream.Send(resp)
4✔
838
        })
4✔
839
        if !ok {
4✔
NEW
840
                return fmt.Errorf("error sending response")
×
NEW
841
        }
×
842

843
        select {
4✔
844
        case <-ctxt.Done():
1✔
845
                return ctxt.Err()
1✔
846

847
        case err := <-errChan:
3✔
848
                if err != nil {
3✔
NEW
849
                        return fmt.Errorf("error sending response: %w", err)
×
NEW
850
                }
×
851
        }
852

853
        r.log.TraceS(ctxt, "Sent response to watch-only node",
3✔
854
                btclog.ClosureAttr("response", formatSignCoordinatorMsg(resp)))
3✔
855

3✔
856
        return nil
3✔
857
}
858

859
// A compile time assertion to ensure OutboundClient meets the
860
// RemoteSignerClient interface.
861
var _ RemoteSignerClient = (*OutboundClient)(nil)
862

863
// formatSignCoordinatorMsg formats the passed proto message into a JSON string.
864
func formatSignCoordinatorMsg(msg protoreflect.ProtoMessage) btclog.Closure {
25✔
865
        return func() string {
25✔
NEW
866
                jsonBytes, err := lnrpc.ProtoJSONMarshalOpts.Marshal(msg)
×
NEW
867
                if err != nil {
×
NEW
868
                        return fmt.Sprintf("<err: %v>", err.Error())
×
NEW
869
                }
×
870

NEW
871
                return string(jsonBytes)
×
872
        }
873
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc