• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14000719599

21 Mar 2025 08:54PM UTC coverage: 58.717% (-10.3%) from 68.989%
14000719599

Pull #8754

github

web-flow
Merge 29f363f18 into 5235f3b24
Pull Request #8754: Add `Outbound` Remote Signer implementation

1562 of 2088 new or added lines in 41 files covered. (74.81%)

28126 existing lines in 464 files now uncovered.

97953 of 166822 relevant lines covered (58.72%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.67
/lnwallet/rpcwallet/sign_coordinator.go
1
package rpcwallet
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "google.golang.org/grpc/codes"
8
        "google.golang.org/grpc/status"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/lightningnetwork/lnd/lnrpc/signrpc"
14
        "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
15
        "github.com/lightningnetwork/lnd/lnutils"
16
        "google.golang.org/grpc"
17
)
18

19
var (
20
        // ErrRequestTimeout is the error that's returned if we time out while
21
        // waiting for a response from the remote signer.
22
        ErrRequestTimeout = errors.New("remote signer response timeout reached")
23

24
        // ErrConnectTimeout is the error that's returned if we time out while
25
        // waiting for the remote signer to connect.
26
        ErrConnectTimeout = errors.New("timed out when waiting for remote " +
27
                "signer to connect")
28

29
        // ErrMultipleConnections is the error that's returned if another
30
        // remote signer attempts to connect while we already have one
31
        // connected.
32
        ErrMultipleConnections = errors.New("only one remote signer can be " +
33
                "connected")
34

35
        // ErrNotConnected is the error that's returned if the remote signer
36
        // closes the stream or we encounter an error when receiving over the
37
        // stream.
38
        ErrNotConnected = errors.New("the remote signer is no longer connected")
39

40
        // ErrUnexpectedResponse is the error that's returned if the response
41
        // with the expected request ID from the remote signer is of an
42
        // unexpected type.
43
        ErrUnexpectedResponse = errors.New("unexpected response type")
44
)
45

46
const (
47
        // noTimeout is a constant that can be used to indicate that no timeout
48
        // should be enforced when waiting for a response from the remote
49
        // signer.
50
        noTimeout = 0
51
)
52

53
// SignCoordinator is an implementation of the signrpc.SignerClient and the
54
// walletrpc.WalletKitClient interfaces that passes on all requests to a remote
55
// signer. It is used by the watch-only wallet to delegate any signing or ECDH
56
// operations to a remote node over a
57
// walletrpc.WalletKit_SignCoordinatorStreamsServer stream. The stream is set up
58
// by the remote signer when it connects to the watch-only wallet, which should
59
// execute the Run method.
60
type SignCoordinator struct {
61
        // nextRequestID keeps track of the next request ID that should
62
        // be used when sending a request to the remote signer.
63
        nextRequestID atomic.Uint64
64

65
        // stream is a bi-directional stream between us and the remote signer.
66
        stream StreamServer
67

68
        // responses is a map of request IDs to response channels. This map
69
        // should be populated with a response channel for each request that has
70
        // been sent to the remote signer. The response channel should be
71
        // inserted into the map before the request is sent.
72
        // Any response received over the stream that does not have an
73
        // associated response channel in this map is ignored.
74
        // The response channel should be removed from the map when the response
75
        // has been received and processed.
76
        responses *lnutils.SyncMap[uint64, chan *signerResponse]
77

78
        // receiveErrChan is used to signal that the stream with the remote
79
        // signer has errored, and we can no longer process responses.
80
        receiveErrChan chan error
81

82
        // disconnected is closed when either party terminates and signals to
83
        // any pending requests that we'll no longer process the response for
84
        // that request.
85
        disconnected chan struct{}
86

87
        // quit is closed when lnd is shutting down.
88
        quit chan struct{}
89

90
        // clientReady is sent over when the remote signer is connected and
91
        // ready to accept requests (after the initial handshake).
92
        clientReady chan struct{}
93

94
        // clientConnected is sent over once a remote signer connects.
95
        clientConnected chan struct{}
96

97
        // requestTimeout is the maximum time we will wait for a response from
98
        // the remote signer.
99
        requestTimeout time.Duration
100

101
        // connectionTimeout is the maximum time we will wait for the remote
102
        // signer to connect.
103
        connectionTimeout time.Duration
104

105
        mu sync.Mutex
106

107
        wg sync.WaitGroup
108
}
109

110
// A compile time assertion to ensure SignCoordinator meets the
111
// RemoteSignerRequests interface.
112
var _ RemoteSignerRequests = (*SignCoordinator)(nil)
113

114
// NewSignCoordinator creates a new instance of the SignCoordinator.
115
func NewSignCoordinator(requestTimeout time.Duration,
116
        connectionTimeout time.Duration) *SignCoordinator {
3✔
117

3✔
118
        respsMap := &lnutils.SyncMap[uint64, chan *signerResponse]{}
3✔
119

3✔
120
        s := &SignCoordinator{
3✔
121
                responses:         respsMap,
3✔
122
                receiveErrChan:    make(chan error, 1),
3✔
123
                clientReady:       make(chan struct{}),
3✔
124
                clientConnected:   make(chan struct{}),
3✔
125
                quit:              make(chan struct{}),
3✔
126
                requestTimeout:    requestTimeout,
3✔
127
                connectionTimeout: connectionTimeout,
3✔
128
                // Note that the disconnected channel is not initialized here,
3✔
129
                // as no code listens to it until the Run method has been called
3✔
130
                // and set the field.
3✔
131
        }
3✔
132

3✔
133
        // We initialize the atomic nextRequestID to the handshakeRequestID, as
3✔
134
        // requestID 1 is reserved for the initial handshake by the remote
3✔
135
        // signer.
3✔
136
        s.nextRequestID.Store(handshakeRequestID)
3✔
137

3✔
138
        return s
3✔
139
}
3✔
140

141
// Run starts the SignCoordinator and blocks until the remote signer either
142
// disconnects, the SignCoordinator is shut down, or an error occurs.
143
func (s *SignCoordinator) Run(stream StreamServer) error {
3✔
144
        s.mu.Lock()
3✔
145

3✔
146
        select {
3✔
NEW
147
        case <-s.quit:
×
NEW
148
                s.mu.Unlock()
×
NEW
149
                return ErrShuttingDown
×
150

NEW
151
        case <-s.clientConnected:
×
NEW
152
                s.mu.Unlock()
×
NEW
153

×
NEW
154
                // If we already have a stream, we error out as we can only have
×
NEW
155
                // one connection at a time.
×
NEW
156
                return ErrMultipleConnections
×
157

158
        default:
3✔
159
        }
160

161
        s.wg.Add(1)
3✔
162
        defer s.wg.Done()
3✔
163

3✔
164
        close(s.clientConnected)
3✔
165
        defer func() {
6✔
166
                s.mu.Lock()
3✔
167
                defer s.mu.Unlock()
3✔
168

3✔
169
                // We create a new clientConnected channel, once this function
3✔
170
                // has exited, to ensure that a new remote signer connection can
3✔
171
                // be set up.
3✔
172
                s.clientConnected = make(chan struct{})
3✔
173
        }()
3✔
174

175
        s.stream = stream
3✔
176

3✔
177
        s.disconnected = make(chan struct{})
3✔
178
        defer close(s.disconnected)
3✔
179

3✔
180
        s.mu.Unlock()
3✔
181

3✔
182
        // The handshake must be completed before we can start sending requests
3✔
183
        // to the remote signer.
3✔
184
        err := s.handshake(stream)
3✔
185
        if err != nil {
3✔
NEW
186
                return err
×
NEW
187
        }
×
188

189
        log.Infof("Remote signer connected and ready")
3✔
190

3✔
191
        close(s.clientReady)
3✔
192
        defer func() {
6✔
193
                s.mu.Lock()
3✔
194
                defer s.mu.Unlock()
3✔
195

3✔
196
                // We create a new clientReady channel, once this function
3✔
197
                // has exited, to ensure that a new remote signer connection can
3✔
198
                // be set up.
3✔
199
                s.clientReady = make(chan struct{})
3✔
200
        }()
3✔
201

202
        // Now let's start the main receiving loop, which will receive all
203
        // responses to our requests from the remote signer!
204
        // We start the receiving loop in a goroutine to ensure that this
205
        // function exits if the SignCoordinator is shut down (i.e. the s.quit
206
        // channel is closed). Returning from this function will cause the
207
        // stream to be closed, which in turn will cause the receiving loop to
208
        // exit.
209
        s.wg.Add(1)
3✔
210
        go s.StartReceiving()
3✔
211

3✔
212
        select {
3✔
213
        case err := <-s.receiveErrChan:
3✔
214
                return err
3✔
215

216
        case <-s.quit:
3✔
217
                return ErrShuttingDown
3✔
218
        }
219
}
220

221
// Stop shuts down the SignCoordinator and waits until the main receiving loop
222
// has exited and all pending requests have been terminated.
223
func (s *SignCoordinator) Stop() {
3✔
224
        log.Infof("Stopping Sign Coordinator")
3✔
225
        defer log.Debugf("Sign coordinator stopped")
3✔
226

3✔
227
        // We lock the mutex before closing the quit channel to ensure that we
3✔
228
        // can't get a concurrent request into the SignCoordinator while we're
3✔
229
        // stopping it. That will ensure that the s.wg.Wait() call below will
3✔
230
        // always wait for any ongoing requests to finish before we return.
3✔
231
        s.mu.Lock()
3✔
232

3✔
233
        close(s.quit)
3✔
234

3✔
235
        s.mu.Unlock()
3✔
236

3✔
237
        s.wg.Wait()
3✔
238
}
3✔
239

240
// handshake performs the initial handshake with the remote signer. This must
241
// be done before any other requests are sent to the remote signer.
242
func (s *SignCoordinator) handshake(stream StreamServer) error {
3✔
243
        var (
3✔
244
                registerChan     = make(chan *walletrpc.SignerRegistration)
3✔
245
                registerDoneChan = make(chan struct{})
3✔
246
                errChan          = make(chan error)
3✔
247
        )
3✔
248

3✔
249
        // Create a context with a timeout using the context from the stream as
3✔
250
        // the parent context. This ensures that we'll exit if either the stream
3✔
251
        // is closed by the remote signer or if we time out.
3✔
252
        ctxt, cancel := context.WithTimeout(
3✔
253
                stream.Context(), s.requestTimeout,
3✔
254
        )
3✔
255
        defer cancel()
3✔
256

3✔
257
        // Read the first message in a goroutine because the Recv method blocks
3✔
258
        // until the message arrives.
3✔
259
        s.wg.Add(1)
3✔
260
        go func() {
6✔
261
                defer s.wg.Done()
3✔
262

3✔
263
                msg, err := stream.Recv()
3✔
264
                if err != nil {
3✔
NEW
265
                        select {
×
NEW
266
                        case errChan <- err:
×
NEW
267
                        case <-ctxt.Done():
×
268
                        }
269

NEW
270
                        return
×
271
                }
272

273
                if msg.GetRefRequestId() != handshakeRequestID {
3✔
NEW
274
                        err = fmt.Errorf("initial request ID must be %d, "+
×
NEW
275
                                "but is: %d", handshakeRequestID,
×
NEW
276
                                msg.GetRefRequestId())
×
NEW
277

×
NEW
278
                        select {
×
NEW
279
                        case errChan <- err:
×
NEW
280
                        case <-ctxt.Done():
×
281
                        }
282

NEW
283
                        return
×
284
                }
285

286
                switch req := msg.GetSignResponseType().(type) {
3✔
287
                case *walletrpc.SignCoordinatorResponse_SignerRegistration:
3✔
288
                        select {
3✔
289
                        case registerChan <- req.SignerRegistration:
3✔
NEW
290
                        case <-ctxt.Done():
×
291
                        }
292

293
                        return
3✔
294

NEW
295
                default:
×
NEW
296
                        err := fmt.Errorf("expected registration message, "+
×
NEW
297
                                "but got: %T", req)
×
NEW
298

×
NEW
299
                        select {
×
NEW
300
                        case errChan <- err:
×
NEW
301
                        case <-ctxt.Done():
×
302
                        }
303

NEW
304
                        return
×
305
                }
306
        }()
307

308
        // Wait for the initial message to arrive or time out if it takes too
309
        // long. The initial message must be a registration message from the
310
        // remote signer.
311
        select {
3✔
312
        case signerRegistration := <-registerChan:
3✔
313
                // TODO(viktor): This could be extended to validate the version
3✔
314
                // of the remote signer in the future.
3✔
315
                if signerRegistration.GetRegistrationInfo() == "" {
3✔
NEW
316
                        return errors.New("invalid remote signer " +
×
NEW
317
                                "registration info")
×
NEW
318
                }
×
319

320
                // Todo(viktor): The RegistrationChallenge in the
321
                // signerRegistration should likely also be signed here.
322

NEW
323
        case err := <-errChan:
×
NEW
324
                return fmt.Errorf("error receiving initial remote signer "+
×
NEW
325
                        "registration message: %v", err)
×
326

NEW
327
        case <-s.quit:
×
NEW
328
                return ErrShuttingDown
×
329

NEW
330
        case <-ctxt.Done():
×
NEW
331
                return ctxt.Err()
×
332
        }
333

334
        complete := &walletrpc.RegistrationResponse_RegistrationComplete{
3✔
335
                // TODO(viktor): The signature should be generated by signing
3✔
336
                // the RegistrationChallenge contained in the SignerRegistration
3✔
337
                // message in the future.
3✔
338
                // The RegistrationInfo could also be extended to include info
3✔
339
                // about the watch-only node in the future.
3✔
340
                RegistrationComplete: &walletrpc.RegistrationComplete{
3✔
341
                        Signature:        "",
3✔
342
                        RegistrationInfo: "watch-only registration info",
3✔
343
                },
3✔
344
        }
3✔
345
        // Send a message to the client to indicate that the registration has
3✔
346
        // successfully completed.
3✔
347
        req := &walletrpc.SignCoordinatorRequest_RegistrationResponse{
3✔
348
                RegistrationResponse: &walletrpc.RegistrationResponse{
3✔
349
                        RegistrationResponseType: complete,
3✔
350
                },
3✔
351
        }
3✔
352

3✔
353
        regCompleteMsg := &walletrpc.SignCoordinatorRequest{
3✔
354
                RequestId:       handshakeRequestID,
3✔
355
                SignRequestType: req,
3✔
356
        }
3✔
357

3✔
358
        // Send the message in a goroutine because the Send method blocks until
3✔
359
        // the message is read by the client.
3✔
360
        s.wg.Add(1)
3✔
361
        go func() {
6✔
362
                defer s.wg.Done()
3✔
363

3✔
364
                err := stream.Send(regCompleteMsg)
3✔
365
                if err != nil {
3✔
NEW
366
                        select {
×
NEW
367
                        case errChan <- err:
×
NEW
368
                        case <-ctxt.Done():
×
369
                        }
370

NEW
371
                        return
×
372
                }
373

374
                close(registerDoneChan)
3✔
375
        }()
376

377
        select {
3✔
NEW
378
        case err := <-errChan:
×
NEW
379
                return fmt.Errorf("error sending registration complete "+
×
NEW
380
                        " message to remote signer: %v", err)
×
381

NEW
382
        case <-ctxt.Done():
×
NEW
383
                return ctxt.Err()
×
384

NEW
385
        case <-s.quit:
×
NEW
386
                return ErrShuttingDown
×
387

388
        case <-registerDoneChan:
3✔
389
        }
390

391
        return nil
3✔
392
}
393

394
// StartReceiving is the main receive loop that receives responses from the
395
// remote signer. Responses must have a RequestID that corresponds to requests
396
// which are waiting for a response; otherwise, the response is ignored.
397
func (s *SignCoordinator) StartReceiving() {
3✔
398
        defer s.wg.Done()
3✔
399

3✔
400
        for {
6✔
401
                resp, err := s.stream.Recv()
3✔
402
                if err != nil {
6✔
403
                        select {
3✔
404
                        // If we've already shut down, the main Run method will
405
                        // not be able to receive any error sent over the error
406
                        // channel. So we just return.
407
                        case <-s.quit:
3✔
408

409
                        // Send the error over the error channel, so that the
410
                        // main Run method can return the error.
411
                        case s.receiveErrChan <- err:
3✔
412
                        }
413

414
                        return
3✔
415
                }
416

417
                respChan, ok := s.responses.Load(resp.GetRefRequestId())
3✔
418

3✔
419
                if ok {
6✔
420
                        select {
3✔
421
                        // We should always be able to send over the response
422
                        // channel, as the channel allows for a buffer of 1, and
423
                        // we shouldn't have multiple requests and responses for
424
                        // the same request ID.
425
                        case respChan <- resp:
3✔
426

NEW
427
                        case <-s.quit:
×
NEW
428
                                return
×
429

430
                        // The timeout case be unreachable, as we should always
431
                        // be able to send 1 response over the response channel.
432
                        // We keep this case just to avoid a scenario where the
433
                        // receive loop would be blocked if we receive multiple
434
                        // responses for the same request ID.
NEW
435
                        case <-time.After(s.requestTimeout):
×
436
                        }
437
                }
438

439
                // If there's no response channel, the thread waiting for the
440
                // response has most likely timed out. We therefore ignore the
441
                // response. The other scenario where we don't have a response
442
                // channel would be if we received a response for a request that
443
                // we didn't send. This should never happen, but if it does, we
444
                // ignore the response.
445

446
                select {
3✔
NEW
447
                case <-s.quit:
×
NEW
448
                        return
×
449
                default:
3✔
450
                }
451
        }
452
}
453

454
// WaitUntilConnected waits until the remote signer has connected. If the remote
455
// signer does not connect within the configured connection timeout, an error is
456
// returned.
457
func (s *SignCoordinator) WaitUntilConnected() error {
3✔
458
        return s.waitUntilConnectedWithTimeout(s.connectionTimeout)
3✔
459
}
3✔
460

461
// waitUntilConnectedWithTimeout waits until the remote signer has connected. If
462
// the remote signer does not connect within the given timeout, an error is
463
// returned.
464
func (s *SignCoordinator) waitUntilConnectedWithTimeout(
465
        timeout time.Duration) error {
3✔
466

3✔
467
        select {
3✔
468
        case <-s.clientReady:
3✔
469
                return nil
3✔
470

NEW
471
        case <-s.quit:
×
NEW
472
                return ErrShuttingDown
×
473

NEW
474
        case <-time.After(timeout):
×
NEW
475
                return ErrConnectTimeout
×
476
        }
477
}
478

479
// createResponseChannel creates a response channel for the given request ID and
480
// inserts it into the responses map. The function returns a cleanup function
481
// which removes the channel from the responses map, and the caller must ensure
482
// that this cleanup function is executed once the thread that's waiting for
483
// the response is done.
484
func (s *SignCoordinator) createResponseChannel(requestID uint64) func() {
3✔
485
        // Create a new response channel.
3✔
486
        respChan := make(chan *signerResponse, 1)
3✔
487

3✔
488
        // Insert the response channel into the map.
3✔
489
        s.responses.Store(requestID, respChan)
3✔
490

3✔
491
        // Create a cleanup function that will delete the response channel.
3✔
492
        return func() {
6✔
493
                select {
3✔
494
                // If we have timed out, there could be a very unlikely
495
                // scenario where we did receive a response before we managed to
496
                // grab the lock in the cleanup func. In that case, we'll just
497
                // ignore the response. We should still clean up the response
498
                // channel though.
NEW
499
                case <-respChan:
×
500
                default:
3✔
501
                }
502

503
                s.responses.Delete(requestID)
3✔
504
        }
505
}
506

507
// getResponse waits for a response with the given request ID and returns the
508
// response if it is received. If the corresponding response from the remote
509
// signer is a SignerError, the error message is returned. If the response is
510
// not received within the given timeout, an error is returned.
511
//
512
// Note: Before calling this function, the caller must have created a response
513
// channel for the request ID.
514
func (s *SignCoordinator) getResponse(requestID uint64,
515
        timeout time.Duration) (*signerResponse, error) {
3✔
516

3✔
517
        respChan, ok := s.responses.Load(requestID)
3✔
518

3✔
519
        // Verify that we have a response channel for the request ID.
3✔
520
        if !ok {
3✔
NEW
521
                // It should be impossible to reach this case, as we create the
×
NEW
522
                // response channel before sending the request.
×
NEW
523
                return nil, fmt.Errorf("no response channel found for "+
×
NEW
524
                        "request ID %d", requestID)
×
NEW
525
        }
×
526

527
        // Wait for the response to arrive.
528
        select {
3✔
529
        case resp, ok := <-respChan:
3✔
530
                if !ok {
3✔
NEW
531
                        // If the response channel was closed, we return an
×
NEW
532
                        // error as the receiving thread must have timed out
×
NEW
533
                        // before we managed to grab the response.
×
NEW
534
                        return nil, ErrRequestTimeout
×
NEW
535
                }
×
536

537
                // a temp type alias to limit the length of the line below.
538
                type sErr = walletrpc.SignCoordinatorResponse_SignerError
3✔
539

3✔
540
                // If the response is an error, we return the error message.
3✔
541
                if errorResp, ok := resp.GetSignResponseType().(*sErr); ok {
6✔
542
                        errStr := errorResp.SignerError.GetError()
3✔
543

3✔
544
                        log.Debugf("Received an error response from remote "+
3✔
545
                                "signer for request ID %d. Error: %v",
3✔
546
                                requestID, errStr)
3✔
547

3✔
548
                        return nil, errors.New(errStr)
3✔
549
                }
3✔
550

551
                log.Debugf("Received remote signer %T response for request "+
3✔
552
                        "ID %d", resp.GetSignResponseType(), requestID)
3✔
553

3✔
554
                log.Tracef("Remote signer response content: %v",
3✔
555
                        formatSignCoordinatorMsg(resp))
3✔
556

3✔
557
                return resp, nil
3✔
558

NEW
559
        case <-s.disconnected:
×
NEW
560
                log.Debugf("Stopped waiting for remote signer response for "+
×
NEW
561
                        "request ID %d as the stream has been closed",
×
NEW
562
                        requestID)
×
NEW
563

×
NEW
564
                return nil, ErrNotConnected
×
565

NEW
566
        case <-s.quit:
×
NEW
567
                log.Debugf("Stopped waiting for remote signer response for "+
×
NEW
568
                        "request ID %d as we're shutting down", requestID)
×
NEW
569

×
NEW
570
                return nil, ErrShuttingDown
×
571

NEW
572
        case <-time.After(timeout):
×
NEW
573
                log.Debugf("Remote signer response timed out for request ID %d",
×
NEW
574
                        requestID)
×
NEW
575

×
NEW
576
                return nil, ErrRequestTimeout
×
577
        }
578
}
579

580
// registerRequest registers a new request with the SignCoordinator, ensuring it
581
// awaits the handling of the request before shutting down. The function returns
582
// a Done function that must be executed once the request has been handled.
583
func (s *SignCoordinator) registerRequest() (func(), error) {
3✔
584
        // We lock the mutex to ensure that we can't have a race where we'd
3✔
585
        // register a request while shutting down.
3✔
586
        s.mu.Lock()
3✔
587
        defer s.mu.Unlock()
3✔
588

3✔
589
        select {
3✔
NEW
590
        case <-s.quit:
×
NEW
591
                return nil, ErrShuttingDown
×
592
        default:
3✔
593
        }
594

595
        s.wg.Add(1)
3✔
596

3✔
597
        return func() {
6✔
598
                s.wg.Done()
3✔
599
        }, nil
3✔
600
}
601

602
// Ping sends a ping request to the remote signer and waits for a pong response.
603
func (s *SignCoordinator) Ping(timeout time.Duration) (bool, error) {
2✔
604
        req := &walletrpc.SignCoordinatorRequest_Ping{
2✔
605
                Ping: true,
2✔
606
        }
2✔
607

2✔
608
        return processRequest(
2✔
609
                s, timeout, // As we're pinging, we specify a time limit.
2✔
610
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
4✔
611
                        return walletrpc.SignCoordinatorRequest{
2✔
612
                                RequestId:       reqId,
2✔
613
                                SignRequestType: req,
2✔
614
                        }
2✔
615
                },
2✔
616
                func(resp *signerResponse) bool {
2✔
617
                        return resp.GetPong()
2✔
618
                },
2✔
619
        )
620
}
621

622
// DeriveSharedKey sends a SharedKeyRequest to the remote signer and waits for
623
// the corresponding response.
624
//
625
// NOTE: This is part of the RemoteSignerRequests interface.
626
func (s *SignCoordinator) DeriveSharedKey(_ context.Context,
627
        in *signrpc.SharedKeyRequest,
628
        _ ...grpc.CallOption) (*signrpc.SharedKeyResponse, error) {
3✔
629

3✔
630
        req := &walletrpc.SignCoordinatorRequest_SharedKeyRequest{
3✔
631
                SharedKeyRequest: in,
3✔
632
        }
3✔
633

3✔
634
        return processRequest(
3✔
635
                s, noTimeout,
3✔
636
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
637
                        return walletrpc.SignCoordinatorRequest{
3✔
638
                                RequestId:       reqId,
3✔
639
                                SignRequestType: req,
3✔
640
                        }
3✔
641
                },
3✔
642
                func(resp *signerResponse) *signrpc.SharedKeyResponse {
3✔
643
                        return resp.GetSharedKeyResponse()
3✔
644
                },
3✔
645
        )
646
}
647

648
// MuSig2Cleanup sends a MuSig2CleanupRequest to the remote signer and waits for
649
// the corresponding response.
650
//
651
// NOTE: This is part of the RemoteSignerRequests interface.
652
func (s *SignCoordinator) MuSig2Cleanup(_ context.Context,
653
        in *signrpc.MuSig2CleanupRequest,
654
        _ ...grpc.CallOption) (*signrpc.MuSig2CleanupResponse, error) {
3✔
655

3✔
656
        req := &walletrpc.SignCoordinatorRequest_MuSig2CleanupRequest{
3✔
657
                MuSig2CleanupRequest: in,
3✔
658
        }
3✔
659

3✔
660
        return processRequest(
3✔
661
                s, noTimeout,
3✔
662
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
663
                        return walletrpc.SignCoordinatorRequest{
3✔
664
                                RequestId:       reqId,
3✔
665
                                SignRequestType: req,
3✔
666
                        }
3✔
667
                },
3✔
668
                func(resp *signerResponse) *signrpc.MuSig2CleanupResponse {
3✔
669
                        return resp.GetMuSig2CleanupResponse()
3✔
670
                },
3✔
671
        )
672
}
673

674
// MuSig2CombineSig sends a MuSig2CombineSigRequest to the remote signer and
675
// waits for the corresponding response.
676
//
677
// NOTE: This is part of the RemoteSignerRequests interface.
678
func (s *SignCoordinator) MuSig2CombineSig(_ context.Context,
679
        in *signrpc.MuSig2CombineSigRequest,
680
        _ ...grpc.CallOption) (*signrpc.MuSig2CombineSigResponse, error) {
3✔
681

3✔
682
        req := &walletrpc.SignCoordinatorRequest_MuSig2CombineSigRequest{
3✔
683
                MuSig2CombineSigRequest: in,
3✔
684
        }
3✔
685

3✔
686
        return processRequest(
3✔
687
                s, noTimeout,
3✔
688
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
689
                        return walletrpc.SignCoordinatorRequest{
3✔
690
                                RequestId:       reqId,
3✔
691
                                SignRequestType: req,
3✔
692
                        }
3✔
693
                },
3✔
694
                func(resp *signerResponse) *signrpc.MuSig2CombineSigResponse {
3✔
695
                        return resp.GetMuSig2CombineSigResponse()
3✔
696
                },
3✔
697
        )
698
}
699

700
// MuSig2CreateSession sends a MuSig2SessionRequest to the remote signer and
701
// waits for the corresponding response.
702
//
703
// NOTE: This is part of the RemoteSignerRequests interface.
704
func (s *SignCoordinator) MuSig2CreateSession(_ context.Context,
705
        in *signrpc.MuSig2SessionRequest,
706
        _ ...grpc.CallOption) (*signrpc.MuSig2SessionResponse, error) {
3✔
707

3✔
708
        req := &walletrpc.SignCoordinatorRequest_MuSig2SessionRequest{
3✔
709
                MuSig2SessionRequest: in,
3✔
710
        }
3✔
711

3✔
712
        return processRequest(
3✔
713
                s, noTimeout,
3✔
714
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
715
                        return walletrpc.SignCoordinatorRequest{
3✔
716
                                RequestId:       reqId,
3✔
717
                                SignRequestType: req,
3✔
718
                        }
3✔
719
                },
3✔
720
                func(resp *signerResponse) *signrpc.MuSig2SessionResponse {
3✔
721
                        return resp.GetMuSig2SessionResponse()
3✔
722
                },
3✔
723
        )
724
}
725

726
// MuSig2RegisterNonces sends a MuSig2RegisterNoncesRequest to the remote signer
727
// and waits for the corresponding response.
728
//
729
// NOTE: This is part of the RemoteSignerRequests interface.
730
func (s *SignCoordinator) MuSig2RegisterNonces(_ context.Context,
731
        in *signrpc.MuSig2RegisterNoncesRequest,
732
        _ ...grpc.CallOption) (*signrpc.MuSig2RegisterNoncesResponse,
733
        error) {
3✔
734

3✔
735
        req := &walletrpc.SignCoordinatorRequest_MuSig2RegisterNoncesRequest{
3✔
736
                MuSig2RegisterNoncesRequest: in,
3✔
737
        }
3✔
738

3✔
739
        type muSig2RegisterNoncesResp = *signrpc.MuSig2RegisterNoncesResponse
3✔
740

3✔
741
        return processRequest(
3✔
742
                s, noTimeout,
3✔
743
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
744
                        return walletrpc.SignCoordinatorRequest{
3✔
745
                                RequestId:       reqId,
3✔
746
                                SignRequestType: req,
3✔
747
                        }
3✔
748
                },
3✔
749
                func(resp *signerResponse) muSig2RegisterNoncesResp {
3✔
750
                        return resp.GetMuSig2RegisterNoncesResponse()
3✔
751
                },
3✔
752
        )
753
}
754

755
// MuSig2Sign sends a MuSig2SignRequest to the remote signer and waits for the
756
// corresponding response.
757
//
758
// NOTE: This is part of the RemoteSignerRequests interface.
759
func (s *SignCoordinator) MuSig2Sign(_ context.Context,
760
        in *signrpc.MuSig2SignRequest,
761
        _ ...grpc.CallOption) (*signrpc.MuSig2SignResponse, error) {
3✔
762

3✔
763
        req := &walletrpc.SignCoordinatorRequest_MuSig2SignRequest{
3✔
764
                MuSig2SignRequest: in,
3✔
765
        }
3✔
766

3✔
767
        return processRequest(
3✔
768
                s, noTimeout,
3✔
769
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
770
                        return walletrpc.SignCoordinatorRequest{
3✔
771
                                RequestId:       reqId,
3✔
772
                                SignRequestType: req,
3✔
773
                        }
3✔
774
                },
3✔
775
                func(resp *signerResponse) *signrpc.MuSig2SignResponse {
3✔
776
                        return resp.GetMuSig2SignResponse()
3✔
777
                },
3✔
778
        )
779
}
780

781
// SignMessage sends a SignMessageReq to the remote signer and waits for the
782
// corresponding response.
783
//
784
// NOTE: This is part of the RemoteSignerRequests interface.
785
func (s *SignCoordinator) SignMessage(_ context.Context,
786
        in *signrpc.SignMessageReq,
787
        _ ...grpc.CallOption) (*signrpc.SignMessageResp, error) {
3✔
788

3✔
789
        req := &walletrpc.SignCoordinatorRequest_SignMessageReq{
3✔
790
                SignMessageReq: in,
3✔
791
        }
3✔
792

3✔
793
        return processRequest(
3✔
794
                s, noTimeout,
3✔
795
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
796
                        return walletrpc.SignCoordinatorRequest{
3✔
797
                                RequestId:       reqId,
3✔
798
                                SignRequestType: req,
3✔
799
                        }
3✔
800
                },
3✔
801
                func(resp *signerResponse) *signrpc.SignMessageResp {
3✔
802
                        return resp.GetSignMessageResp()
3✔
803
                },
3✔
804
        )
805
}
806

807
// SignPsbt sends a SignPsbtRequest to the remote signer and waits for the
808
// corresponding response.
809
//
810
// NOTE: This is part of the RemoteSignerRequests interface.
811
func (s *SignCoordinator) SignPsbt(_ context.Context,
812
        in *walletrpc.SignPsbtRequest,
813
        _ ...grpc.CallOption) (*walletrpc.SignPsbtResponse, error) {
3✔
814

3✔
815
        req := &walletrpc.SignCoordinatorRequest_SignPsbtRequest{
3✔
816
                SignPsbtRequest: in,
3✔
817
        }
3✔
818

3✔
819
        return processRequest(
3✔
820
                s, noTimeout,
3✔
821
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
822
                        return walletrpc.SignCoordinatorRequest{
3✔
823
                                RequestId:       reqId,
3✔
824
                                SignRequestType: req,
3✔
825
                        }
3✔
826
                },
3✔
827
                func(resp *signerResponse) *walletrpc.SignPsbtResponse {
3✔
828
                        return resp.GetSignPsbtResponse()
3✔
829
                },
3✔
830
        )
831
}
832

833
// processRequest is a generic function that sends a request to the remote
834
// signer and waits for the corresponding response. If a timeout is set, the
835
// function will limit the execution time of the entire function to the
836
// specified timeout. If it is not set, configured timeouts will be used for
837
// the individual operations within the function.
838
func processRequest[R comparable](s *SignCoordinator, timeout time.Duration,
839
        generateRequest func(uint64) walletrpc.SignCoordinatorRequest,
840
        extractResponse func(*signerResponse) R) (R, error) {
3✔
841

3✔
842
        var zero R
3✔
843

3✔
844
        done, err := s.registerRequest()
3✔
845
        if err != nil {
3✔
NEW
846
                return zero, err
×
NEW
847
        }
×
848
        defer done()
3✔
849

3✔
850
        startTime := time.Now()
3✔
851

3✔
852
        // If a timeout is enforced, we will wait for the connection using the
3✔
853
        // specified timeout. Otherwise, we will wait for the connection using
3✔
854
        // the configured connection timeout.
3✔
855
        if timeout != 0 {
5✔
856
                err = s.waitUntilConnectedWithTimeout(timeout)
2✔
857
        } else {
5✔
858
                err = s.WaitUntilConnected()
3✔
859
        }
3✔
860

861
        if err != nil {
3✔
NEW
862
                return zero, err
×
NEW
863
        }
×
864

865
        reqID := s.nextRequestID.Add(1)
3✔
866
        req := generateRequest(reqID)
3✔
867

3✔
868
        cleanUpChannel := s.createResponseChannel(reqID)
3✔
869
        defer cleanUpChannel()
3✔
870

3✔
871
        log.Debugf("Sending a %T to the remote signer with request ID %d",
3✔
872
                req.SignRequestType, reqID)
3✔
873

3✔
874
        log.Tracef("Request content: %v", formatSignCoordinatorMsg(&req))
3✔
875

3✔
876
        // reprocessOnDisconnect is a helper function that will be used to
3✔
877
        // resend the request if the remote signer disconnects, through which
3✔
878
        // we will wait for it to reconnect within the configured timeout, and
3✔
879
        // then resend the request.
3✔
880
        reprocessOnDisconnect := func() (R, error) {
3✔
NEW
881
                var newTimeout time.Duration = noTimeout
×
NEW
882

×
NEW
883
                if timeout != 0 {
×
NEW
884
                        newTimeout = timeout - time.Since(startTime)
×
NEW
885

×
NEW
886
                        if time.Since(startTime) > timeout {
×
NEW
887
                                return zero, ErrRequestTimeout
×
NEW
888
                        }
×
889
                }
890

NEW
891
                return processRequest[R](
×
NEW
892
                        s, newTimeout, generateRequest, extractResponse,
×
NEW
893
                )
×
894
        }
895

896
        err = s.stream.Send(&req)
3✔
897
        if err != nil {
3✔
NEW
898
                st, isStatusError := status.FromError(err)
×
NEW
899
                if isStatusError && st.Code() == codes.Unavailable {
×
NEW
900
                        // If the stream was closed due to the remote signer
×
NEW
901
                        // disconnecting, we will retry to process the request
×
NEW
902
                        // if the remote signer reconnects.
×
NEW
903
                        return reprocessOnDisconnect()
×
NEW
904
                }
×
905

NEW
906
                return zero, err
×
907
        }
908

909
        var resp *walletrpc.SignCoordinatorResponse
3✔
910

3✔
911
        // If a timeout is enforced, we need to limit the entire execution time
3✔
912
        // of this function to the timeout. Therefore, we need to calculate the
3✔
913
        // remaining allowed execution time.
3✔
914
        // If no timeout is enforced, we will wait for the response using the
3✔
915
        // configured request timeout.
3✔
916
        if timeout != 0 {
5✔
917
                newTimeout := timeout - time.Since(startTime)
2✔
918

2✔
919
                if time.Since(startTime) > timeout {
2✔
NEW
920
                        return zero, ErrRequestTimeout
×
NEW
921
                }
×
922

923
                resp, err = s.getResponse(reqID, newTimeout)
2✔
924
        } else {
3✔
925
                resp, err = s.getResponse(reqID, s.requestTimeout)
3✔
926
        }
3✔
927

928
        if errors.Is(err, ErrNotConnected) {
3✔
NEW
929
                // If the remote signer disconnected while we were waiting for
×
NEW
930
                // the response, we will retry to process the request if the
×
NEW
931
                // remote signer reconnects.
×
NEW
932
                return reprocessOnDisconnect()
×
933
        } else if err != nil {
6✔
934
                return zero, err
3✔
935
        }
3✔
936

937
        rpcResp := extractResponse(resp)
3✔
938
        if rpcResp == zero {
3✔
NEW
939
                return zero, ErrUnexpectedResponse
×
NEW
940
        }
×
941

942
        return rpcResp, nil
3✔
943
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc