• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13974489001

20 Mar 2025 04:32PM UTC coverage: 56.292% (-2.9%) from 59.168%
13974489001

Pull #8754

github

web-flow
Merge aed149e6b into ea050d06f
Pull Request #8754: Add `Outbound` Remote Signer implementation

594 of 1713 new or added lines in 26 files covered. (34.68%)

23052 existing lines in 272 files now uncovered.

105921 of 188165 relevant lines covered (56.29%)

23796.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.32
/lnwallet/rpcwallet/sign_coordinator.go
1
package rpcwallet
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/lnrpc/signrpc"
12
        "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
13
        "github.com/lightningnetwork/lnd/lnutils"
14
        "google.golang.org/grpc"
15
)
16

17
var (
18
        // ErrRequestTimeout is the error that's returned if we time out while
19
        // waiting for a response from the remote signer.
20
        ErrRequestTimeout = errors.New("remote signer response timeout reached")
21

22
        // ErrConnectTimeout is the error that's returned if we time out while
23
        // waiting for the remote signer to connect.
24
        ErrConnectTimeout = errors.New("timed out when waiting for remote " +
25
                "signer to connect")
26

27
        // ErrMultipleConnections is the error that's returned if another
28
        // remote signer attempts to connect while we already have one
29
        // connected.
30
        ErrMultipleConnections = errors.New("only one remote signer can be " +
31
                "connected")
32

33
        // ErrNotConnected is the error that's returned if the remote signer
34
        // closes the stream or we encounter an error when receiving over the
35
        // stream.
36
        ErrNotConnected = errors.New("the remote signer is no longer connected")
37

38
        // ErrUnexpectedResponse is the error that's returned if the response
39
        // with the expected request ID from the remote signer is of an
40
        // unexpected type.
41
        ErrUnexpectedResponse = errors.New("unexpected response type")
42
)
43

44
const (
45
        // noTimeout is a constant that can be used to indicate that no timeout
46
        // should be enforced when waiting for a response from the remote
47
        // signer.
48
        noTimeout = 0
49
)
50

51
// SignCoordinator is an implementation of the signrpc.SignerClient and the
52
// walletrpc.WalletKitClient interfaces that passes on all requests to a remote
53
// signer. It is used by the watch-only wallet to delegate any signing or ECDH
54
// operations to a remote node over a
55
// walletrpc.WalletKit_SignCoordinatorStreamsServer stream. The stream is set up
56
// by the remote signer when it connects to the watch-only wallet, which should
57
// execute the Run method.
58
type SignCoordinator struct {
59
        // nextRequestID keeps track of the next request ID that should
60
        // be used when sending a request to the remote signer.
61
        nextRequestID atomic.Uint64
62

63
        // stream is a bi-directional stream between us and the remote signer.
64
        stream StreamServer
65

66
        // responses is a map of request IDs to response channels. This map
67
        // should be populated with a response channel for each request that has
68
        // been sent to the remote signer. The response channel should be
69
        // inserted into the map before the request is sent.
70
        // Any response received over the stream that does not have an
71
        // associated response channel in this map is ignored.
72
        // The response channel should be removed from the map when the response
73
        // has been received and processed.
74
        responses *lnutils.SyncMap[uint64, chan *signerResponse]
75

76
        // receiveErrChan is used to signal that the stream with the remote
77
        // signer has errored, and we can no longer process responses.
78
        receiveErrChan chan error
79

80
        // doneReceiving is closed when either party terminates and signals to
81
        // any pending requests that we'll no longer process the response for
82
        // that request.
83
        doneReceiving chan struct{}
84

85
        // quit is closed when lnd is shutting down.
86
        quit chan struct{}
87

88
        // clientConnected is sent over when the remote signer connects.
89
        clientConnected chan struct{}
90

91
        // requestTimeout is the maximum time we will wait for a response from
92
        // the remote signer.
93
        requestTimeout time.Duration
94

95
        // connectionTimeout is the maximum time we will wait for the remote
96
        // signer to connect.
97
        connectionTimeout time.Duration
98

99
        mu sync.Mutex
100

101
        wg sync.WaitGroup
102
}
103

104
// A compile time assertion to ensure SignCoordinator meets the
105
// RemoteSignerRequests interface.
106
var _ RemoteSignerRequests = (*SignCoordinator)(nil)
107

108
// NewSignCoordinator creates a new instance of the SignCoordinator.
109
func NewSignCoordinator(requestTimeout time.Duration,
110
        connectionTimeout time.Duration) *SignCoordinator {
8✔
111

8✔
112
        respsMap := &lnutils.SyncMap[uint64, chan *signerResponse]{}
8✔
113

8✔
114
        s := &SignCoordinator{
8✔
115
                responses:         respsMap,
8✔
116
                receiveErrChan:    make(chan error, 1),
8✔
117
                doneReceiving:     make(chan struct{}),
8✔
118
                clientConnected:   make(chan struct{}),
8✔
119
                quit:              make(chan struct{}),
8✔
120
                requestTimeout:    requestTimeout,
8✔
121
                connectionTimeout: connectionTimeout,
8✔
122
        }
8✔
123

8✔
124
        // We initialize the atomic nextRequestID to the handshakeRequestID, as
8✔
125
        // requestID 1 is reserved for the initial handshake by the remote
8✔
126
        // signer.
8✔
127
        s.nextRequestID.Store(handshakeRequestID)
8✔
128

8✔
129
        return s
8✔
130
}
8✔
131

132
// Run starts the SignCoordinator and blocks until the remote signer either
133
// disconnects, the SignCoordinator is shut down, or an error occurs.
134
func (s *SignCoordinator) Run(stream StreamServer) error {
8✔
135
        s.mu.Lock()
8✔
136

8✔
137
        select {
8✔
NEW
138
        case <-s.quit:
×
NEW
139
                s.mu.Unlock()
×
NEW
140
                return ErrShuttingDown
×
141

NEW
142
        case <-s.doneReceiving:
×
NEW
143
                s.mu.Unlock()
×
NEW
144
                return ErrNotConnected
×
145

146
        default:
8✔
147
        }
148

149
        s.wg.Add(1)
8✔
150
        defer s.wg.Done()
8✔
151

8✔
152
        // If we already have a stream, we error out as we can only have one
8✔
153
        // connection throughout the lifetime of the SignCoordinator.
8✔
154
        if s.stream != nil {
8✔
NEW
155
                s.mu.Unlock()
×
NEW
156
                return ErrMultipleConnections
×
NEW
157
        }
×
158

159
        s.stream = stream
8✔
160

8✔
161
        s.mu.Unlock()
8✔
162

8✔
163
        // The handshake must be completed before we can start sending requests
8✔
164
        // to the remote signer.
8✔
165
        err := s.handshake(stream)
8✔
166
        if err != nil {
8✔
NEW
167
                return err
×
NEW
168
        }
×
169

170
        log.Infof("Remote signer connected")
8✔
171
        close(s.clientConnected)
8✔
172

8✔
173
        // Now let's start the main receiving loop, which will receive all
8✔
174
        // responses to our requests from the remote signer!
8✔
175
        // We start the receiving loop in a goroutine to ensure that this
8✔
176
        // function exits if the SignCoordinator is shut down (i.e. the s.quit
8✔
177
        // channel is closed). Returning from this function will cause the
8✔
178
        // stream to be closed, which in turn will cause the receiving loop to
8✔
179
        // exit.
8✔
180
        s.wg.Add(1)
8✔
181
        go s.StartReceiving()
8✔
182

8✔
183
        select {
8✔
184
        case err := <-s.receiveErrChan:
1✔
185
                return err
1✔
186

187
        case <-s.quit:
1✔
188
                return ErrShuttingDown
1✔
189

NEW
190
        case <-s.doneReceiving:
×
NEW
191
                return ErrNotConnected
×
192
        }
193
}
194

195
// Stop shuts down the SignCoordinator and waits until the main receiving loop
196
// has exited and all pending requests have been terminated.
197
func (s *SignCoordinator) Stop() {
1✔
198
        log.Infof("Stopping Sign Coordinator")
1✔
199
        defer log.Debugf("Sign coordinator stopped")
1✔
200

1✔
201
        // We lock the mutex before closing the quit channel to ensure that we
1✔
202
        // can't get a concurrent request into the SignCoordinator while we're
1✔
203
        // stopping it. That will ensure that the s.wg.Wait() call below will
1✔
204
        // always wait for any ongoing requests to finish before we return.
1✔
205
        s.mu.Lock()
1✔
206

1✔
207
        close(s.quit)
1✔
208

1✔
209
        s.mu.Unlock()
1✔
210

1✔
211
        s.wg.Wait()
1✔
212
}
1✔
213

214
// handshake performs the initial handshake with the remote signer. This must
215
// be done before any other requests are sent to the remote signer.
216
func (s *SignCoordinator) handshake(stream StreamServer) error {
8✔
217
        var (
8✔
218
                registerChan     = make(chan *walletrpc.SignerRegistration)
8✔
219
                registerDoneChan = make(chan struct{})
8✔
220
                errChan          = make(chan error)
8✔
221
        )
8✔
222

8✔
223
        // Create a context with a timeout using the context from the stream as
8✔
224
        // the parent context. This ensures that we'll exit if either the stream
8✔
225
        // is closed by the remote signer or if we time out.
8✔
226
        ctxt, cancel := context.WithTimeout(
8✔
227
                stream.Context(), s.requestTimeout,
8✔
228
        )
8✔
229
        defer cancel()
8✔
230

8✔
231
        // Read the first message in a goroutine because the Recv method blocks
8✔
232
        // until the message arrives.
8✔
233
        s.wg.Add(1)
8✔
234
        go func() {
16✔
235
                defer s.wg.Done()
8✔
236

8✔
237
                msg, err := stream.Recv()
8✔
238
                if err != nil {
8✔
NEW
239
                        select {
×
NEW
240
                        case errChan <- err:
×
NEW
241
                        case <-ctxt.Done():
×
242
                        }
243

NEW
244
                        return
×
245
                }
246

247
                if msg.GetRefRequestId() != handshakeRequestID {
8✔
NEW
248
                        err = fmt.Errorf("initial request ID must be %d, "+
×
NEW
249
                                "but is: %d", handshakeRequestID,
×
NEW
250
                                msg.GetRefRequestId())
×
NEW
251

×
NEW
252
                        select {
×
NEW
253
                        case errChan <- err:
×
NEW
254
                        case <-ctxt.Done():
×
255
                        }
256

NEW
257
                        return
×
258
                }
259

260
                switch req := msg.GetSignResponseType().(type) {
8✔
261
                case *walletrpc.SignCoordinatorResponse_SignerRegistration:
8✔
262
                        select {
8✔
263
                        case registerChan <- req.SignerRegistration:
8✔
NEW
264
                        case <-ctxt.Done():
×
265
                        }
266

267
                        return
8✔
268

NEW
269
                default:
×
NEW
270
                        err := fmt.Errorf("expected registration message, "+
×
NEW
271
                                "but got: %T", req)
×
NEW
272

×
NEW
273
                        select {
×
NEW
274
                        case errChan <- err:
×
NEW
275
                        case <-ctxt.Done():
×
276
                        }
277

NEW
278
                        return
×
279
                }
280
        }()
281

282
        // Wait for the initial message to arrive or time out if it takes too
283
        // long. The initial message must be a registration message from the
284
        // remote signer.
285
        select {
8✔
286
        case signerRegistration := <-registerChan:
8✔
287
                // TODO(viktor): This could be extended to validate the version
8✔
288
                // of the remote signer in the future.
8✔
289
                if signerRegistration.GetRegistrationInfo() == "" {
8✔
NEW
290
                        return errors.New("invalid remote signer " +
×
NEW
291
                                "registration info")
×
NEW
292
                }
×
293

294
                // Todo(viktor): The RegistrationChallenge in the
295
                // signerRegistration should likely also be signed here.
296

NEW
297
        case err := <-errChan:
×
NEW
298
                return fmt.Errorf("error receiving initial remote signer "+
×
NEW
299
                        "registration message: %v", err)
×
300

NEW
301
        case <-s.quit:
×
NEW
302
                return ErrShuttingDown
×
303

NEW
304
        case <-ctxt.Done():
×
NEW
305
                return ctxt.Err()
×
306
        }
307

308
        complete := &walletrpc.RegistrationResponse_RegistrationComplete{
8✔
309
                // TODO(viktor): The signature should be generated by signing
8✔
310
                // the RegistrationChallenge contained in the SignerRegistration
8✔
311
                // message in the future.
8✔
312
                // The RegistrationInfo could also be extended to include info
8✔
313
                // about the watch-only node in the future.
8✔
314
                RegistrationComplete: &walletrpc.RegistrationComplete{
8✔
315
                        Signature:        "",
8✔
316
                        RegistrationInfo: "watch-only registration info",
8✔
317
                },
8✔
318
        }
8✔
319
        // Send a message to the client to indicate that the registration has
8✔
320
        // successfully completed.
8✔
321
        req := &walletrpc.SignCoordinatorRequest_RegistrationResponse{
8✔
322
                RegistrationResponse: &walletrpc.RegistrationResponse{
8✔
323
                        RegistrationResponseType: complete,
8✔
324
                },
8✔
325
        }
8✔
326

8✔
327
        regCompleteMsg := &walletrpc.SignCoordinatorRequest{
8✔
328
                RequestId:       handshakeRequestID,
8✔
329
                SignRequestType: req,
8✔
330
        }
8✔
331

8✔
332
        // Send the message in a goroutine because the Send method blocks until
8✔
333
        // the message is read by the client.
8✔
334
        s.wg.Add(1)
8✔
335
        go func() {
16✔
336
                defer s.wg.Done()
8✔
337

8✔
338
                err := stream.Send(regCompleteMsg)
8✔
339
                if err != nil {
8✔
NEW
340
                        select {
×
NEW
341
                        case errChan <- err:
×
NEW
342
                        case <-ctxt.Done():
×
343
                        }
344

NEW
345
                        return
×
346
                }
347

348
                close(registerDoneChan)
8✔
349
        }()
350

351
        select {
8✔
NEW
352
        case err := <-errChan:
×
NEW
353
                return fmt.Errorf("error sending registration complete "+
×
NEW
354
                        " message to remote signer: %v", err)
×
355

NEW
356
        case <-ctxt.Done():
×
NEW
357
                return ctxt.Err()
×
358

NEW
359
        case <-s.quit:
×
NEW
360
                return ErrShuttingDown
×
361

362
        case <-registerDoneChan:
8✔
363
        }
364

365
        return nil
8✔
366
}
367

368
// StartReceiving is the main receive loop that receives responses from the
369
// remote signer. Responses must have a RequestID that corresponds to requests
370
// which are waiting for a response; otherwise, the response is ignored.
371
func (s *SignCoordinator) StartReceiving() {
8✔
372
        defer s.wg.Done()
8✔
373

8✔
374
        // Signals to any ongoing requests that the remote signer is no longer
8✔
375
        // connected.
8✔
376
        defer close(s.doneReceiving)
8✔
377

8✔
378
        for {
25✔
379
                resp, err := s.stream.Recv()
17✔
380
                if err != nil {
19✔
381
                        select {
2✔
382
                        // If we've already shut down, the main Run method will
383
                        // not be able to receive any error sent over the error
384
                        // channel. So we just return.
385
                        case <-s.quit:
1✔
386

387
                        // Send the error over the error channel, so that the
388
                        // main Run method can return the error.
389
                        case s.receiveErrChan <- err:
1✔
390
                        }
391

392
                        return
2✔
393
                }
394

395
                respChan, ok := s.responses.Load(resp.GetRefRequestId())
9✔
396

9✔
397
                if ok {
16✔
398
                        select {
7✔
399
                        // We should always be able to send over the response
400
                        // channel, as the channel allows for a buffer of 1, and
401
                        // we shouldn't have multiple requests and responses for
402
                        // the same request ID.
403
                        case respChan <- resp:
7✔
404

NEW
405
                        case <-s.quit:
×
NEW
406
                                return
×
407

408
                        // The timeout case be unreachable, as we should always
409
                        // be able to send 1 response over the response channel.
410
                        // We keep this case just to avoid a scenario where the
411
                        // receive loop would be blocked if we receive multiple
412
                        // responses for the same request ID.
NEW
413
                        case <-time.After(s.requestTimeout):
×
414
                        }
415
                }
416

417
                // If there's no response channel, the thread waiting for the
418
                // response has most likely timed out. We therefore ignore the
419
                // response. The other scenario where we don't have a response
420
                // channel would be if we received a response for a request that
421
                // we didn't send. This should never happen, but if it does, we
422
                // ignore the response.
423

424
                select {
9✔
NEW
425
                case <-s.quit:
×
NEW
426
                        return
×
427
                default:
9✔
428
                }
429
        }
430
}
431

432
// WaitUntilConnected waits until the remote signer has connected. If the remote
433
// signer does not connect within the configured connection timeout, an error is
434
// returned.
NEW
435
func (s *SignCoordinator) WaitUntilConnected() error {
×
NEW
436
        return s.waitUntilConnectedWithTimeout(s.connectionTimeout)
×
NEW
437
}
×
438

439
// waitUntilConnectedWithTimeout waits until the remote signer has connected. If
440
// the remote signer does not connect within the given timeout, an error is
441
// returned.
442
func (s *SignCoordinator) waitUntilConnectedWithTimeout(
443
        timeout time.Duration) error {
12✔
444

12✔
445
        select {
12✔
446
        case <-s.clientConnected:
12✔
447
                return nil
12✔
448

NEW
449
        case <-s.quit:
×
NEW
450
                return ErrShuttingDown
×
451

NEW
452
        case <-time.After(timeout):
×
NEW
453
                return ErrConnectTimeout
×
454

NEW
455
        case <-s.doneReceiving:
×
NEW
456
                return ErrNotConnected
×
457
        }
458
}
459

460
// createResponseChannel creates a response channel for the given request ID and
461
// inserts it into the responses map. The function returns a cleanup function
462
// which removes the channel from the responses map, and the caller must ensure
463
// that this cleanup function is executed once the thread that's waiting for
464
// the response is done.
465
func (s *SignCoordinator) createResponseChannel(requestID uint64) func() {
12✔
466
        // Create a new response channel.
12✔
467
        respChan := make(chan *signerResponse, 1)
12✔
468

12✔
469
        // Insert the response channel into the map.
12✔
470
        s.responses.Store(requestID, respChan)
12✔
471

12✔
472
        // Create a cleanup function that will delete the response channel.
12✔
473
        return func() {
24✔
474
                select {
12✔
475
                // If we have timed out, there could be a very unlikely
476
                // scenario where we did receive a response before we managed to
477
                // grab the lock in the cleanup func. In that case, we'll just
478
                // ignore the response. We should still clean up the response
479
                // channel though.
NEW
480
                case <-respChan:
×
481
                default:
12✔
482
                }
483

484
                s.responses.Delete(requestID)
12✔
485
        }
486
}
487

488
// getResponse waits for a response with the given request ID and returns the
489
// response if it is received. If the corresponding response from the remote
490
// signer is a SignerError, the error message is returned. If the response is
491
// not received within the given timeout, an error is returned.
492
//
493
// Note: Before calling this function, the caller must have created a response
494
// channel for the request ID.
495
func (s *SignCoordinator) getResponse(requestID uint64,
496
        timeout time.Duration) (*signerResponse, error) {
12✔
497

12✔
498
        respChan, ok := s.responses.Load(requestID)
12✔
499

12✔
500
        // Verify that we have a response channel for the request ID.
12✔
501
        if !ok {
12✔
NEW
502
                // It should be impossible to reach this case, as we create the
×
NEW
503
                // response channel before sending the request.
×
NEW
504
                return nil, fmt.Errorf("no response channel found for "+
×
NEW
505
                        "request ID %d", requestID)
×
NEW
506
        }
×
507

508
        // Wait for the response to arrive.
509
        select {
12✔
510
        case resp, ok := <-respChan:
7✔
511
                if !ok {
7✔
NEW
512
                        // If the response channel was closed, we return an
×
NEW
513
                        // error as the receiving thread must have timed out
×
NEW
514
                        // before we managed to grab the response.
×
NEW
515
                        return nil, ErrRequestTimeout
×
NEW
516
                }
×
517

518
                // a temp type alias to limit the length of the line below.
519
                type sErr = walletrpc.SignCoordinatorResponse_SignerError
7✔
520

7✔
521
                // If the response is an error, we return the error message.
7✔
522
                if errorResp, ok := resp.GetSignResponseType().(*sErr); ok {
8✔
523
                        errStr := errorResp.SignerError.GetError()
1✔
524

1✔
525
                        log.Debugf("Received an error response from remote "+
1✔
526
                                "signer for request ID %d. Error: %v",
1✔
527
                                requestID, errStr)
1✔
528

1✔
529
                        return nil, errors.New(errStr)
1✔
530
                }
1✔
531

532
                log.Debugf("Received remote signer %T response for request "+
6✔
533
                        "ID %d", resp.GetSignResponseType(), requestID)
6✔
534

6✔
535
                log.Tracef("Remote signer response content: %v",
6✔
536
                        formatSignCoordinatorMsg(resp))
6✔
537

6✔
538
                return resp, nil
6✔
539

540
        case <-s.doneReceiving:
1✔
541
                log.Debugf("Stopped waiting for remote signer response for "+
1✔
542
                        "request ID %d as the stream has been closed",
1✔
543
                        requestID)
1✔
544

1✔
545
                return nil, ErrNotConnected
1✔
546

547
        case <-s.quit:
1✔
548
                log.Debugf("Stopped waiting for remote signer response for "+
1✔
549
                        "request ID %d as we're shutting down", requestID)
1✔
550

1✔
551
                return nil, ErrShuttingDown
1✔
552

553
        case <-time.After(timeout):
3✔
554
                log.Debugf("Remote signer response timed out for request ID %d",
3✔
555
                        requestID)
3✔
556

3✔
557
                return nil, ErrRequestTimeout
3✔
558
        }
559
}
560

561
// registerRequest registers a new request with the SignCoordinator, ensuring it
562
// awaits the handling of the request before shutting down. The function returns
563
// a Done function that must be executed once the request has been handled.
564
func (s *SignCoordinator) registerRequest() (func(), error) {
12✔
565
        // We lock the mutex to ensure that we can't have a race where we'd
12✔
566
        // register a request while shutting down.
12✔
567
        s.mu.Lock()
12✔
568
        defer s.mu.Unlock()
12✔
569

12✔
570
        select {
12✔
NEW
571
        case <-s.quit:
×
NEW
572
                return nil, ErrShuttingDown
×
573
        default:
12✔
574
        }
575

576
        s.wg.Add(1)
12✔
577

12✔
578
        return func() {
24✔
579
                s.wg.Done()
12✔
580
        }, nil
12✔
581
}
582

583
// Ping sends a ping request to the remote signer and waits for a pong response.
584
func (s *SignCoordinator) Ping(timeout time.Duration) (bool, error) {
12✔
585
        req := &walletrpc.SignCoordinatorRequest_Ping{
12✔
586
                Ping: true,
12✔
587
        }
12✔
588

12✔
589
        return processRequest(
12✔
590
                s, timeout, // As we're pinging, we specify a time limit.
12✔
591
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
24✔
592
                        return walletrpc.SignCoordinatorRequest{
12✔
593
                                RequestId:       reqId,
12✔
594
                                SignRequestType: req,
12✔
595
                        }
12✔
596
                },
12✔
597
                func(resp *signerResponse) bool {
6✔
598
                        return resp.GetPong()
6✔
599
                },
6✔
600
        )
601
}
602

603
// DeriveSharedKey sends a SharedKeyRequest to the remote signer and waits for
604
// the corresponding response.
605
//
606
// NOTE: This is part of the RemoteSignerRequests interface.
607
func (s *SignCoordinator) DeriveSharedKey(_ context.Context,
608
        in *signrpc.SharedKeyRequest,
NEW
609
        _ ...grpc.CallOption) (*signrpc.SharedKeyResponse, error) {
×
NEW
610

×
NEW
611
        req := &walletrpc.SignCoordinatorRequest_SharedKeyRequest{
×
NEW
612
                SharedKeyRequest: in,
×
NEW
613
        }
×
NEW
614

×
NEW
615
        return processRequest(
×
NEW
616
                s, noTimeout,
×
NEW
617
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
×
NEW
618
                        return walletrpc.SignCoordinatorRequest{
×
NEW
619
                                RequestId:       reqId,
×
NEW
620
                                SignRequestType: req,
×
NEW
621
                        }
×
NEW
622
                },
×
NEW
623
                func(resp *signerResponse) *signrpc.SharedKeyResponse {
×
NEW
624
                        return resp.GetSharedKeyResponse()
×
NEW
625
                },
×
626
        )
627
}
628

629
// MuSig2Cleanup sends a MuSig2CleanupRequest to the remote signer and waits for
630
// the corresponding response.
631
//
632
// NOTE: This is part of the RemoteSignerRequests interface.
633
func (s *SignCoordinator) MuSig2Cleanup(_ context.Context,
634
        in *signrpc.MuSig2CleanupRequest,
NEW
635
        _ ...grpc.CallOption) (*signrpc.MuSig2CleanupResponse, error) {
×
NEW
636

×
NEW
637
        req := &walletrpc.SignCoordinatorRequest_MuSig2CleanupRequest{
×
NEW
638
                MuSig2CleanupRequest: in,
×
NEW
639
        }
×
NEW
640

×
NEW
641
        return processRequest(
×
NEW
642
                s, noTimeout,
×
NEW
643
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
×
NEW
644
                        return walletrpc.SignCoordinatorRequest{
×
NEW
645
                                RequestId:       reqId,
×
NEW
646
                                SignRequestType: req,
×
NEW
647
                        }
×
NEW
648
                },
×
NEW
649
                func(resp *signerResponse) *signrpc.MuSig2CleanupResponse {
×
NEW
650
                        return resp.GetMuSig2CleanupResponse()
×
NEW
651
                },
×
652
        )
653
}
654

655
// MuSig2CombineSig sends a MuSig2CombineSigRequest to the remote signer and
656
// waits for the corresponding response.
657
//
658
// NOTE: This is part of the RemoteSignerRequests interface.
659
func (s *SignCoordinator) MuSig2CombineSig(_ context.Context,
660
        in *signrpc.MuSig2CombineSigRequest,
NEW
661
        _ ...grpc.CallOption) (*signrpc.MuSig2CombineSigResponse, error) {
×
NEW
662

×
NEW
663
        req := &walletrpc.SignCoordinatorRequest_MuSig2CombineSigRequest{
×
NEW
664
                MuSig2CombineSigRequest: in,
×
NEW
665
        }
×
NEW
666

×
NEW
667
        return processRequest(
×
NEW
668
                s, noTimeout,
×
NEW
669
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
×
NEW
670
                        return walletrpc.SignCoordinatorRequest{
×
NEW
671
                                RequestId:       reqId,
×
NEW
672
                                SignRequestType: req,
×
NEW
673
                        }
×
NEW
674
                },
×
NEW
675
                func(resp *signerResponse) *signrpc.MuSig2CombineSigResponse {
×
NEW
676
                        return resp.GetMuSig2CombineSigResponse()
×
NEW
677
                },
×
678
        )
679
}
680

681
// MuSig2CreateSession sends a MuSig2SessionRequest to the remote signer and
682
// waits for the corresponding response.
683
//
684
// NOTE: This is part of the RemoteSignerRequests interface.
685
func (s *SignCoordinator) MuSig2CreateSession(_ context.Context,
686
        in *signrpc.MuSig2SessionRequest,
NEW
687
        _ ...grpc.CallOption) (*signrpc.MuSig2SessionResponse, error) {
×
NEW
688

×
NEW
689
        req := &walletrpc.SignCoordinatorRequest_MuSig2SessionRequest{
×
NEW
690
                MuSig2SessionRequest: in,
×
NEW
691
        }
×
NEW
692

×
NEW
693
        return processRequest(
×
NEW
694
                s, noTimeout,
×
NEW
695
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
×
NEW
696
                        return walletrpc.SignCoordinatorRequest{
×
NEW
697
                                RequestId:       reqId,
×
NEW
698
                                SignRequestType: req,
×
NEW
699
                        }
×
NEW
700
                },
×
NEW
701
                func(resp *signerResponse) *signrpc.MuSig2SessionResponse {
×
NEW
702
                        return resp.GetMuSig2SessionResponse()
×
NEW
703
                },
×
704
        )
705
}
706

707
// MuSig2RegisterNonces sends a MuSig2RegisterNoncesRequest to the remote signer
708
// and waits for the corresponding response.
709
//
710
// NOTE: This is part of the RemoteSignerRequests interface.
711
func (s *SignCoordinator) MuSig2RegisterNonces(_ context.Context,
712
        in *signrpc.MuSig2RegisterNoncesRequest,
713
        _ ...grpc.CallOption) (*signrpc.MuSig2RegisterNoncesResponse,
NEW
714
        error) {
×
NEW
715

×
NEW
716
        req := &walletrpc.SignCoordinatorRequest_MuSig2RegisterNoncesRequest{
×
NEW
717
                MuSig2RegisterNoncesRequest: in,
×
NEW
718
        }
×
NEW
719

×
NEW
720
        type muSig2RegisterNoncesResp = *signrpc.MuSig2RegisterNoncesResponse
×
NEW
721

×
NEW
722
        return processRequest(
×
NEW
723
                s, noTimeout,
×
NEW
724
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
×
NEW
725
                        return walletrpc.SignCoordinatorRequest{
×
NEW
726
                                RequestId:       reqId,
×
NEW
727
                                SignRequestType: req,
×
NEW
728
                        }
×
NEW
729
                },
×
NEW
730
                func(resp *signerResponse) muSig2RegisterNoncesResp {
×
NEW
731
                        return resp.GetMuSig2RegisterNoncesResponse()
×
NEW
732
                },
×
733
        )
734
}
735

736
// MuSig2Sign sends a MuSig2SignRequest to the remote signer and waits for the
737
// corresponding response.
738
//
739
// NOTE: This is part of the RemoteSignerRequests interface.
740
func (s *SignCoordinator) MuSig2Sign(_ context.Context,
741
        in *signrpc.MuSig2SignRequest,
NEW
742
        _ ...grpc.CallOption) (*signrpc.MuSig2SignResponse, error) {
×
NEW
743

×
NEW
744
        req := &walletrpc.SignCoordinatorRequest_MuSig2SignRequest{
×
NEW
745
                MuSig2SignRequest: in,
×
NEW
746
        }
×
NEW
747

×
NEW
748
        return processRequest(
×
NEW
749
                s, noTimeout,
×
NEW
750
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
×
NEW
751
                        return walletrpc.SignCoordinatorRequest{
×
NEW
752
                                RequestId:       reqId,
×
NEW
753
                                SignRequestType: req,
×
NEW
754
                        }
×
NEW
755
                },
×
NEW
756
                func(resp *signerResponse) *signrpc.MuSig2SignResponse {
×
NEW
757
                        return resp.GetMuSig2SignResponse()
×
NEW
758
                },
×
759
        )
760
}
761

762
// SignMessage sends a SignMessageReq to the remote signer and waits for the
763
// corresponding response.
764
//
765
// NOTE: This is part of the RemoteSignerRequests interface.
766
func (s *SignCoordinator) SignMessage(_ context.Context,
767
        in *signrpc.SignMessageReq,
NEW
768
        _ ...grpc.CallOption) (*signrpc.SignMessageResp, error) {
×
NEW
769

×
NEW
770
        req := &walletrpc.SignCoordinatorRequest_SignMessageReq{
×
NEW
771
                SignMessageReq: in,
×
NEW
772
        }
×
NEW
773

×
NEW
774
        return processRequest(
×
NEW
775
                s, noTimeout,
×
NEW
776
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
×
NEW
777
                        return walletrpc.SignCoordinatorRequest{
×
NEW
778
                                RequestId:       reqId,
×
NEW
779
                                SignRequestType: req,
×
NEW
780
                        }
×
NEW
781
                },
×
NEW
782
                func(resp *signerResponse) *signrpc.SignMessageResp {
×
NEW
783
                        return resp.GetSignMessageResp()
×
NEW
784
                },
×
785
        )
786
}
787

788
// SignPsbt sends a SignPsbtRequest to the remote signer and waits for the
789
// corresponding response.
790
//
791
// NOTE: This is part of the RemoteSignerRequests interface.
792
func (s *SignCoordinator) SignPsbt(_ context.Context,
793
        in *walletrpc.SignPsbtRequest,
NEW
794
        _ ...grpc.CallOption) (*walletrpc.SignPsbtResponse, error) {
×
NEW
795

×
NEW
796
        req := &walletrpc.SignCoordinatorRequest_SignPsbtRequest{
×
NEW
797
                SignPsbtRequest: in,
×
NEW
798
        }
×
NEW
799

×
NEW
800
        return processRequest(
×
NEW
801
                s, noTimeout,
×
NEW
802
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
×
NEW
803
                        return walletrpc.SignCoordinatorRequest{
×
NEW
804
                                RequestId:       reqId,
×
NEW
805
                                SignRequestType: req,
×
NEW
806
                        }
×
NEW
807
                },
×
NEW
808
                func(resp *signerResponse) *walletrpc.SignPsbtResponse {
×
NEW
809
                        return resp.GetSignPsbtResponse()
×
NEW
810
                },
×
811
        )
812
}
813

814
// processRequest is a generic function that sends a request to the remote
815
// signer and waits for the corresponding response. If a timeout is set, the
816
// function will limit the execution time of the entire function to the
817
// specified timeout. If it is not set, configured timeouts will be used for
818
// the individual operations within the function.
819
func processRequest[R comparable](s *SignCoordinator, timeout time.Duration,
820
        generateRequest func(uint64) walletrpc.SignCoordinatorRequest,
821
        extractResponse func(*signerResponse) R) (R, error) {
12✔
822

12✔
823
        var zero R
12✔
824

12✔
825
        done, err := s.registerRequest()
12✔
826
        if err != nil {
12✔
NEW
827
                return zero, err
×
NEW
828
        }
×
829
        defer done()
12✔
830

12✔
831
        startTime := time.Now()
12✔
832

12✔
833
        // If a timeout is enforced, we will wait for the connection using the
12✔
834
        // specified timeout. Otherwise, we will wait for the connection using
12✔
835
        // the configured connection timeout.
12✔
836
        if timeout != 0 {
24✔
837
                err = s.waitUntilConnectedWithTimeout(timeout)
12✔
838
        } else {
12✔
NEW
839
                err = s.WaitUntilConnected()
×
NEW
840
        }
×
841

842
        if err != nil {
12✔
NEW
843
                return zero, err
×
NEW
844
        }
×
845

846
        reqID := s.nextRequestID.Add(1)
12✔
847
        req := generateRequest(reqID)
12✔
848

12✔
849
        cleanUpChannel := s.createResponseChannel(reqID)
12✔
850
        defer cleanUpChannel()
12✔
851

12✔
852
        log.Debugf("Sending a %T to the remote signer with request ID %d",
12✔
853
                req.SignRequestType, reqID)
12✔
854

12✔
855
        log.Tracef("Request content: %v", formatSignCoordinatorMsg(&req))
12✔
856

12✔
857
        err = s.stream.Send(&req)
12✔
858
        if err != nil {
12✔
NEW
859
                return zero, err
×
NEW
860
        }
×
861

862
        var resp *walletrpc.SignCoordinatorResponse
12✔
863

12✔
864
        // If a timeout is enforced, we need to limit the entire execution time
12✔
865
        // of this function to the timeout. Therefore, we need to calculate the
12✔
866
        // remaining allowed execution time.
12✔
867
        // If no timeout is enforced, we will wait for the response using the
12✔
868
        // configured request timeout.
12✔
869
        if timeout != 0 {
24✔
870
                newTimeout := timeout - time.Since(startTime)
12✔
871

12✔
872
                if time.Since(startTime) > timeout {
12✔
NEW
873
                        return zero, ErrRequestTimeout
×
NEW
874
                }
×
875

876
                resp, err = s.getResponse(reqID, newTimeout)
12✔
NEW
877
        } else {
×
NEW
878
                resp, err = s.getResponse(reqID, s.requestTimeout)
×
NEW
879
        }
×
880

881
        if err != nil {
18✔
882
                return zero, err
6✔
883
        }
6✔
884

885
        rpcResp := extractResponse(resp)
6✔
886
        if rpcResp == zero {
6✔
NEW
887
                return zero, ErrUnexpectedResponse
×
NEW
888
        }
×
889

890
        return rpcResp, nil
6✔
891
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc