• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12412879685

19 Dec 2024 12:40PM UTC coverage: 58.744% (+0.09%) from 58.653%
12412879685

Pull #8754

github

ViktorTigerstrom
itest: wrap deriveCustomScopeAccounts at 80 chars

This commit fixes that word wrapping for the deriveCustomScopeAccounts
function docs, and ensures that it wraps at 80 characters or less.
Pull Request #8754: Add `Outbound` Remote Signer implementation

1858 of 2816 new or added lines in 47 files covered. (65.98%)

267 existing lines in 51 files now uncovered.

136038 of 231578 relevant lines covered (58.74%)

19020.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.46
/lnwallet/rpcwallet/sign_coordinator.go
1
package rpcwallet
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/lnrpc/signrpc"
12
        "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
13
        "github.com/lightningnetwork/lnd/lnutils"
14
        "google.golang.org/grpc"
15
)
16

17
var (
18
        // ErrRequestTimeout is the error that's returned if we time out while
19
        // waiting for a response from the remote signer.
20
        ErrRequestTimeout = errors.New("remote signer response timeout reached")
21

22
        // ErrConnectTimeout is the error that's returned if we time out while
23
        // waiting for the remote signer to connect.
24
        ErrConnectTimeout = errors.New("timed out when waiting for remote " +
25
                "signer to connect")
26

27
        // ErrMultipleConnections is the error that's returned if another
28
        // remote signer attempts to connect while we already have one
29
        // connected.
30
        ErrMultipleConnections = errors.New("only one remote signer can be " +
31
                "connected")
32

33
        // ErrNotConnected is the error that's returned if the remote signer
34
        // closes the stream or we encounter an error when receiving over the
35
        // stream.
36
        ErrNotConnected = errors.New("the remote signer is no longer connected")
37

38
        // ErrUnexpectedResponse is the error that's returned if the response
39
        // with the expected request ID from the remote signer is of an
40
        // unexpected type.
41
        ErrUnexpectedResponse = errors.New("unexpected response type")
42
)
43

44
const (
45
        // noTimeout is a constant that can be used to indicate that no timeout
46
        // should be enforced when waiting for a response from the remote
47
        // signer.
48
        noTimeout = 0
49
)
50

51
// SignCoordinator is an implementation of the signrpc.SignerClient and the
52
// walletrpc.WalletKitClient interfaces that passes on all requests to a remote
53
// signer. It is used by the watch-only wallet to delegate any signing or ECDH
54
// operations to a remote node over a
55
// walletrpc.WalletKit_SignCoordinatorStreamsServer stream. The stream is set up
56
// by the remote signer when it connects to the watch-only wallet, which should
57
// execute the Run method.
58
type SignCoordinator struct {
59
        // nextRequestID keeps track of the next request ID that should
60
        // be used when sending a request to the remote signer.
61
        nextRequestID atomic.Uint64
62

63
        // stream is a bi-directional stream between us and the remote signer.
64
        stream StreamServer
65

66
        // responses is a map of request IDs to response channels. This map
67
        // should be populated with a response channel for each request that has
68
        // been sent to the remote signer. The response channel should be
69
        // inserted into the map before the request is sent.
70
        // Any response received over the stream that does not have an
71
        // associated response channel in this map is ignored.
72
        // The response channel should be removed from the map when the response
73
        // has been received and processed.
74
        responses *lnutils.SyncMap[uint64, chan *signerResponse]
75

76
        // receiveErrChan is used to signal that the stream with the remote
77
        // signer has errored, and we can no longer process responses.
78
        receiveErrChan chan error
79

80
        // doneReceiving is closed when either party terminates and signals to
81
        // any pending requests that we'll no longer process the response for
82
        // that request.
83
        doneReceiving chan struct{}
84

85
        // quit is closed when lnd is shutting down.
86
        quit chan struct{}
87

88
        // clientConnected is sent over when the remote signer connects.
89
        clientConnected chan struct{}
90

91
        // requestTimeout is the maximum time we will wait for a response from
92
        // the remote signer.
93
        requestTimeout time.Duration
94

95
        // connectionTimeout is the maximum time we will wait for the remote
96
        // signer to connect.
97
        connectionTimeout time.Duration
98

99
        mu sync.Mutex
100

101
        wg sync.WaitGroup
102
}
103

104
// A compile time assertion to ensure SignCoordinator meets the
105
// RemoteSignerRequests interface.
106
var _ RemoteSignerRequests = (*SignCoordinator)(nil)
107

108
// NewSignCoordinator creates a new instance of the SignCoordinator.
109
func NewSignCoordinator(requestTimeout time.Duration,
110
        connectionTimeout time.Duration) *SignCoordinator {
11✔
111

11✔
112
        respsMap := &lnutils.SyncMap[uint64, chan *signerResponse]{}
11✔
113

11✔
114
        s := &SignCoordinator{
11✔
115
                responses:         respsMap,
11✔
116
                receiveErrChan:    make(chan error, 1),
11✔
117
                doneReceiving:     make(chan struct{}),
11✔
118
                clientConnected:   make(chan struct{}),
11✔
119
                quit:              make(chan struct{}),
11✔
120
                requestTimeout:    requestTimeout,
11✔
121
                connectionTimeout: connectionTimeout,
11✔
122
        }
11✔
123

11✔
124
        // We initialize the atomic nextRequestID to the handshakeRequestID, as
11✔
125
        // requestID 1 is reserved for the initial handshake by the remote
11✔
126
        // signer.
11✔
127
        s.nextRequestID.Store(handshakeRequestID)
11✔
128

11✔
129
        return s
11✔
130
}
11✔
131

132
// Run starts the SignCoordinator and blocks until the remote signer either
133
// disconnects, the SignCoordinator is shut down, or an error occurs.
134
func (s *SignCoordinator) Run(stream StreamServer) error {
11✔
135
        s.mu.Lock()
11✔
136

11✔
137
        select {
11✔
NEW
138
        case <-s.quit:
×
NEW
139
                s.mu.Unlock()
×
NEW
140
                return ErrShuttingDown
×
141

NEW
142
        case <-s.doneReceiving:
×
NEW
143
                s.mu.Unlock()
×
NEW
144
                return ErrNotConnected
×
145

146
        default:
11✔
147
        }
148

149
        s.wg.Add(1)
11✔
150
        defer s.wg.Done()
11✔
151

11✔
152
        // If we already have a stream, we error out as we can only have one
11✔
153
        // connection throughout the lifetime of the SignCoordinator.
11✔
154
        if s.stream != nil {
11✔
NEW
155
                s.mu.Unlock()
×
NEW
156
                return ErrMultipleConnections
×
NEW
157
        }
×
158

159
        s.stream = stream
11✔
160

11✔
161
        s.mu.Unlock()
11✔
162

11✔
163
        // The handshake must be completed before we can start sending requests
11✔
164
        // to the remote signer.
11✔
165
        err := s.handshake(stream)
11✔
166
        if err != nil {
11✔
NEW
167
                return err
×
NEW
168
        }
×
169

170
        log.Infof("Remote signer connected")
11✔
171
        close(s.clientConnected)
11✔
172

11✔
173
        // Now let's start the main receiving loop, which will receive all
11✔
174
        // responses to our requests from the remote signer!
11✔
175
        // We start the receiving loop in a goroutine to ensure that this
11✔
176
        // function exits if the SignCoordinator is shut down (i.e. the s.quit
11✔
177
        // channel is closed). Returning from this function will cause the
11✔
178
        // stream to be closed, which in turn will cause the receiving loop to
11✔
179
        // exit.
11✔
180
        s.wg.Add(1)
11✔
181
        go s.StartReceiving()
11✔
182

11✔
183
        select {
11✔
184
        case err := <-s.receiveErrChan:
3✔
185
                return err
3✔
186

187
        case <-s.quit:
4✔
188
                return ErrShuttingDown
4✔
189

NEW
190
        case <-s.doneReceiving:
×
NEW
191
                return ErrNotConnected
×
192
        }
193
}
194

195
// Stop shuts down the SignCoordinator and waits until the main receiving loop
196
// has exited and all pending requests have been terminated.
197
func (s *SignCoordinator) Stop() {
4✔
198
        log.Infof("Stopping Sign Coordinator")
4✔
199
        defer log.Debugf("Sign coordinator stopped")
4✔
200

4✔
201
        // We lock the mutex before closing the quit channel to ensure that we
4✔
202
        // can't get a concurrent request into the SignCoordinator while we're
4✔
203
        // stopping it. That will ensure that the s.wg.Wait() call below will
4✔
204
        // always wait for any ongoing requests to finish before we return.
4✔
205
        s.mu.Lock()
4✔
206

4✔
207
        close(s.quit)
4✔
208

4✔
209
        s.mu.Unlock()
4✔
210

4✔
211
        s.wg.Wait()
4✔
212
}
4✔
213

214
// handshake performs the initial handshake with the remote signer. This must
215
// be done before any other requests are sent to the remote signer.
216
func (s *SignCoordinator) handshake(stream StreamServer) error {
11✔
217
        var (
11✔
218
                registerChan     = make(chan *walletrpc.SignerRegistration)
11✔
219
                registerDoneChan = make(chan struct{})
11✔
220
                errChan          = make(chan error)
11✔
221
        )
11✔
222

11✔
223
        // Create a context with a timeout using the context from the stream as
11✔
224
        // the parent context. This ensures that we'll exit if either the stream
11✔
225
        // is closed by the remote signer or if we time out.
11✔
226
        ctxt, cancel := context.WithTimeout(
11✔
227
                stream.Context(), s.requestTimeout,
11✔
228
        )
11✔
229
        defer cancel()
11✔
230

11✔
231
        // Read the first message in a goroutine because the Recv method blocks
11✔
232
        // until the message arrives.
11✔
233
        s.wg.Add(1)
11✔
234
        go func() {
22✔
235
                defer s.wg.Done()
11✔
236

11✔
237
                msg, err := stream.Recv()
11✔
238
                if err != nil {
11✔
NEW
239
                        select {
×
NEW
240
                        case errChan <- err:
×
NEW
241
                        case <-ctxt.Done():
×
242
                        }
243

NEW
244
                        return
×
245
                }
246

247
                if msg.GetRefRequestId() != handshakeRequestID {
11✔
NEW
248
                        err = fmt.Errorf("initial request ID must be %d, "+
×
NEW
249
                                "but is: %d", handshakeRequestID,
×
NEW
250
                                msg.GetRefRequestId())
×
NEW
251

×
NEW
252
                        select {
×
NEW
253
                        case errChan <- err:
×
NEW
254
                        case <-ctxt.Done():
×
255
                        }
256

NEW
257
                        return
×
258
                }
259

260
                switch req := msg.GetSignResponseType().(type) {
11✔
261
                case *walletrpc.SignCoordinatorResponse_SignerRegistration:
11✔
262
                        select {
11✔
263
                        case registerChan <- req.SignerRegistration:
11✔
NEW
264
                        case <-ctxt.Done():
×
265
                        }
266

267
                        return
11✔
268

NEW
269
                default:
×
NEW
270
                        err := fmt.Errorf("expected registration message, "+
×
NEW
271
                                "but got: %T", req)
×
NEW
272

×
NEW
273
                        select {
×
NEW
274
                        case errChan <- err:
×
NEW
275
                        case <-ctxt.Done():
×
276
                        }
277

NEW
278
                        return
×
279
                }
280
        }()
281

282
        // Wait for the initial message to arrive or time out if it takes too
283
        // long. The initial message must be a registration message from the
284
        // remote signer.
285
        select {
11✔
286
        case signerRegistration := <-registerChan:
11✔
287
                // TODO(viktor): This could be extended to validate the version
11✔
288
                // of the remote signer in the future.
11✔
289
                if signerRegistration.GetRegistrationInfo() == "" {
11✔
NEW
290
                        return errors.New("invalid remote signer " +
×
NEW
291
                                "registration info")
×
NEW
292
                }
×
293

294
                // Todo(viktor): The RegistrationChallenge in the
295
                // signerRegistration should likely also be signed here.
296

NEW
297
        case err := <-errChan:
×
NEW
298
                return fmt.Errorf("error receiving initial remote signer "+
×
NEW
299
                        "registration message: %v", err)
×
300

NEW
301
        case <-s.quit:
×
NEW
302
                return ErrShuttingDown
×
303

NEW
304
        case <-ctxt.Done():
×
NEW
305
                return ctxt.Err()
×
306
        }
307

308
        complete := &walletrpc.RegistrationResponse_RegistrationComplete{
11✔
309
                // TODO(viktor): The signature should be generated by signing
11✔
310
                // the RegistrationChallenge contained in the SignerRegistration
11✔
311
                // message in the future.
11✔
312
                // The RegistrationInfo could also be extended to include info
11✔
313
                // about the watch-only node in the future.
11✔
314
                RegistrationComplete: &walletrpc.RegistrationComplete{
11✔
315
                        Signature:        "",
11✔
316
                        RegistrationInfo: "watch-only registration info",
11✔
317
                },
11✔
318
        }
11✔
319
        // Send a message to the client to indicate that the registration has
11✔
320
        // successfully completed.
11✔
321
        req := &walletrpc.SignCoordinatorRequest_RegistrationResponse{
11✔
322
                RegistrationResponse: &walletrpc.RegistrationResponse{
11✔
323
                        RegistrationResponseType: complete,
11✔
324
                },
11✔
325
        }
11✔
326

11✔
327
        regCompleteMsg := &walletrpc.SignCoordinatorRequest{
11✔
328
                RequestId:       handshakeRequestID,
11✔
329
                SignRequestType: req,
11✔
330
        }
11✔
331

11✔
332
        // Send the message in a goroutine because the Send method blocks until
11✔
333
        // the message is read by the client.
11✔
334
        s.wg.Add(1)
11✔
335
        go func() {
22✔
336
                defer s.wg.Done()
11✔
337

11✔
338
                err := stream.Send(regCompleteMsg)
11✔
339
                if err != nil {
11✔
NEW
340
                        select {
×
NEW
341
                        case errChan <- err:
×
NEW
342
                        case <-ctxt.Done():
×
343
                        }
344

NEW
345
                        return
×
346
                }
347

348
                close(registerDoneChan)
11✔
349
        }()
350

351
        select {
11✔
NEW
352
        case err := <-errChan:
×
NEW
353
                return fmt.Errorf("error sending registration complete "+
×
NEW
354
                        " message to remote signer: %v", err)
×
355

NEW
356
        case <-ctxt.Done():
×
NEW
357
                return ctxt.Err()
×
358

NEW
359
        case <-s.quit:
×
NEW
360
                return ErrShuttingDown
×
361

362
        case <-registerDoneChan:
11✔
363
        }
364

365
        return nil
11✔
366
}
367

368
// StartReceiving is the main receive loop that receives responses from the
369
// remote signer. Responses must have a RequestID that corresponds to requests
370
// which are waiting for a response; otherwise, the response is ignored.
371
func (s *SignCoordinator) StartReceiving() {
11✔
372
        defer s.wg.Done()
11✔
373

11✔
374
        // Signals to any ongoing requests that the remote signer is no longer
11✔
375
        // connected.
11✔
376
        defer close(s.doneReceiving)
11✔
377

11✔
378
        for {
31✔
379
                resp, err := s.stream.Recv()
20✔
380
                if err != nil {
25✔
381
                        select {
5✔
382
                        // If we've already shut down, the main Run method will
383
                        // not be able to receive any error sent over the error
384
                        // channel. So we just return.
385
                        case <-s.quit:
4✔
386

387
                        // Send the error over the error channel, so that the
388
                        // main Run method can return the error.
389
                        case s.receiveErrChan <- err:
4✔
390
                        }
391

392
                        return
5✔
393
                }
394

395
                respChan, ok := s.responses.Load(resp.GetRefRequestId())
12✔
396

12✔
397
                if ok {
22✔
398
                        select {
10✔
399
                        // We should always be able to send over the response
400
                        // channel, as the channel allows for a buffer of 1, and
401
                        // we shouldn't have multiple requests and responses for
402
                        // the same request ID.
403
                        case respChan <- resp:
10✔
404

NEW
405
                        case <-s.quit:
×
NEW
406
                                return
×
407

408
                        // The timeout case be unreachable, as we should always
409
                        // be able to send 1 response over the response channel.
410
                        // We keep this case just to avoid a scenario where the
411
                        // receive loop would be blocked if we receive multiple
412
                        // responses for the same request ID.
NEW
413
                        case <-time.After(s.requestTimeout):
×
414
                        }
415
                }
416

417
                // If there's no response channel, the thread waiting for the
418
                // response has most likely timed out. We therefore ignore the
419
                // response. The other scenario where we don't have a response
420
                // channel would be if we received a response for a request that
421
                // we didn't send. This should never happen, but if it does, we
422
                // ignore the response.
423

424
                select {
12✔
NEW
425
                case <-s.quit:
×
NEW
426
                        return
×
427
                default:
12✔
428
                }
429
        }
430
}
431

432
// WaitUntilConnected waits until the remote signer has connected. If the remote
433
// signer does not connect within the configured connection timeout, an error is
434
// returned.
435
func (s *SignCoordinator) WaitUntilConnected() error {
3✔
436
        return s.waitUntilConnectedWithTimeout(s.connectionTimeout)
3✔
437
}
3✔
438

439
// waitUntilConnectedWithTimeout waits until the remote signer has connected. If
440
// the remote signer does not connect within the given timeout, an error is
441
// returned.
442
func (s *SignCoordinator) waitUntilConnectedWithTimeout(
443
        timeout time.Duration) error {
15✔
444

15✔
445
        select {
15✔
446
        case <-s.clientConnected:
15✔
447
                return nil
15✔
448

NEW
449
        case <-s.quit:
×
NEW
450
                return ErrShuttingDown
×
451

NEW
452
        case <-time.After(timeout):
×
NEW
453
                return ErrConnectTimeout
×
454

NEW
455
        case <-s.doneReceiving:
×
NEW
456
                return ErrNotConnected
×
457
        }
458
}
459

460
// createResponseChannel creates a response channel for the given request ID and
461
// inserts it into the responses map. The function returns a cleanup function
462
// which removes the channel from the responses map, and the caller must ensure
463
// that this cleanup function is executed once the thread that's waiting for
464
// the response is done.
465
func (s *SignCoordinator) createResponseChannel(requestID uint64) func() {
15✔
466
        // Create a new response channel.
15✔
467
        respChan := make(chan *signerResponse, 1)
15✔
468

15✔
469
        // Insert the response channel into the map.
15✔
470
        s.responses.Store(requestID, respChan)
15✔
471

15✔
472
        // Create a cleanup function that will delete the response channel.
15✔
473
        return func() {
30✔
474
                select {
15✔
475
                // If we have timed out, there could be a very unlikely
476
                // scenario where we did receive a response before we managed to
477
                // grab the lock in the cleanup func. In that case, we'll just
478
                // ignore the response. We should still clean up the response
479
                // channel though.
NEW
480
                case <-respChan:
×
481
                default:
15✔
482
                }
483

484
                s.responses.Delete(requestID)
15✔
485
        }
486
}
487

488
// getResponse waits for a response with the given request ID and returns the
489
// response if it is received. If the corresponding response from the remote
490
// signer is a SignerError, the error message is returned. If the response is
491
// not received within the given timeout, an error is returned.
492
//
493
// Note: Before calling this function, the caller must have created a response
494
// channel for the request ID.
495
func (s *SignCoordinator) getResponse(requestID uint64,
496
        timeout time.Duration) (*signerResponse, error) {
15✔
497

15✔
498
        respChan, ok := s.responses.Load(requestID)
15✔
499

15✔
500
        // Verify that we have a response channel for the request ID.
15✔
501
        if !ok {
15✔
NEW
502
                // It should be impossible to reach this case, as we create the
×
NEW
503
                // response channel before sending the request.
×
NEW
504
                return nil, fmt.Errorf("no response channel found for "+
×
NEW
505
                        "request ID %d", requestID)
×
NEW
506
        }
×
507

508
        // Wait for the response to arrive.
509
        select {
15✔
510
        case resp, ok := <-respChan:
10✔
511
                if !ok {
10✔
NEW
512
                        // If the response channel was closed, we return an
×
NEW
513
                        // error as the receiving thread must have timed out
×
NEW
514
                        // before we managed to grab the response.
×
NEW
515
                        return nil, ErrRequestTimeout
×
NEW
516
                }
×
517

518
                // a temp type alias to limit the length of the line below.
519
                type sErr = walletrpc.SignCoordinatorResponse_SignerError
10✔
520

10✔
521
                // If the response is an error, we return the error message.
10✔
522
                if errorResp, ok := resp.GetSignResponseType().(*sErr); ok {
14✔
523
                        errStr := errorResp.SignerError.GetError()
4✔
524

4✔
525
                        log.Debugf("Received an error response from remote "+
4✔
526
                                "signer for request ID %d. Error: %v",
4✔
527
                                requestID, errStr)
4✔
528

4✔
529
                        return nil, errors.New(errStr)
4✔
530
                }
4✔
531

532
                log.Debugf("Received remote signer %T response for request "+
9✔
533
                        "ID %d", resp.GetSignResponseType(), requestID)
9✔
534

9✔
535
                log.Tracef("Remote signer response content: %v",
9✔
536
                        formatSignCoordinatorMsg(resp))
9✔
537

9✔
538
                return resp, nil
9✔
539

540
        case <-s.doneReceiving:
1✔
541
                log.Debugf("Stopped waiting for remote signer response for "+
1✔
542
                        "request ID %d as the stream has been closed",
1✔
543
                        requestID)
1✔
544

1✔
545
                return nil, ErrNotConnected
1✔
546

547
        case <-s.quit:
1✔
548
                log.Debugf("Stopped waiting for remote signer response for "+
1✔
549
                        "request ID %d as we're shutting down", requestID)
1✔
550

1✔
551
                return nil, ErrShuttingDown
1✔
552

553
        case <-time.After(timeout):
3✔
554
                log.Debugf("Remote signer response timed out for request ID %d",
3✔
555
                        requestID)
3✔
556

3✔
557
                return nil, ErrRequestTimeout
3✔
558
        }
559
}
560

561
// registerRequest registers a new request with the SignCoordinator, ensuring it
562
// awaits the handling of the request before shutting down. The function returns
563
// a Done function that must be executed once the request has been handled.
564
func (s *SignCoordinator) registerRequest() (func(), error) {
15✔
565
        // We lock the mutex to ensure that we can't have a race where we'd
15✔
566
        // register a request while shutting down.
15✔
567
        s.mu.Lock()
15✔
568
        defer s.mu.Unlock()
15✔
569

15✔
570
        select {
15✔
NEW
571
        case <-s.quit:
×
NEW
572
                return nil, ErrShuttingDown
×
573
        default:
15✔
574
        }
575

576
        s.wg.Add(1)
15✔
577

15✔
578
        return func() {
30✔
579
                s.wg.Done()
15✔
580
        }, nil
15✔
581
}
582

583
// Ping sends a ping request to the remote signer and waits for a pong response.
584
func (s *SignCoordinator) Ping(timeout time.Duration) (bool, error) {
15✔
585
        req := &walletrpc.SignCoordinatorRequest_Ping{
15✔
586
                Ping: true,
15✔
587
        }
15✔
588

15✔
589
        return processRequest(
15✔
590
                s, timeout, // As we're pinging, we specify a time limit.
15✔
591
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
30✔
592
                        return walletrpc.SignCoordinatorRequest{
15✔
593
                                RequestId:       reqId,
15✔
594
                                SignRequestType: req,
15✔
595
                        }
15✔
596
                },
15✔
597
                func(resp *signerResponse) bool {
9✔
598
                        return resp.GetPong()
9✔
599
                },
9✔
600
        )
601
}
602

603
// DeriveSharedKey sends a SharedKeyRequest to the remote signer and waits for
604
// the corresponding response.
605
//
606
// NOTE: This is part of the RemoteSignerRequests interface.
607
func (s *SignCoordinator) DeriveSharedKey(_ context.Context,
608
        in *signrpc.SharedKeyRequest,
609
        _ ...grpc.CallOption) (*signrpc.SharedKeyResponse, error) {
3✔
610

3✔
611
        req := &walletrpc.SignCoordinatorRequest_SharedKeyRequest{
3✔
612
                SharedKeyRequest: in,
3✔
613
        }
3✔
614

3✔
615
        return processRequest(
3✔
616
                s, noTimeout,
3✔
617
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
618
                        return walletrpc.SignCoordinatorRequest{
3✔
619
                                RequestId:       reqId,
3✔
620
                                SignRequestType: req,
3✔
621
                        }
3✔
622
                },
3✔
623
                func(resp *signerResponse) *signrpc.SharedKeyResponse {
3✔
624
                        return resp.GetSharedKeyResponse()
3✔
625
                },
3✔
626
        )
627
}
628

629
// MuSig2Cleanup sends a MuSig2CleanupRequest to the remote signer and waits for
630
// the corresponding response.
631
//
632
// NOTE: This is part of the RemoteSignerRequests interface.
633
func (s *SignCoordinator) MuSig2Cleanup(_ context.Context,
634
        in *signrpc.MuSig2CleanupRequest,
635
        _ ...grpc.CallOption) (*signrpc.MuSig2CleanupResponse, error) {
3✔
636

3✔
637
        req := &walletrpc.SignCoordinatorRequest_MuSig2CleanupRequest{
3✔
638
                MuSig2CleanupRequest: in,
3✔
639
        }
3✔
640

3✔
641
        return processRequest(
3✔
642
                s, noTimeout,
3✔
643
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
644
                        return walletrpc.SignCoordinatorRequest{
3✔
645
                                RequestId:       reqId,
3✔
646
                                SignRequestType: req,
3✔
647
                        }
3✔
648
                },
3✔
649
                func(resp *signerResponse) *signrpc.MuSig2CleanupResponse {
3✔
650
                        return resp.GetMuSig2CleanupResponse()
3✔
651
                },
3✔
652
        )
653
}
654

655
// MuSig2CombineSig sends a MuSig2CombineSigRequest to the remote signer and
656
// waits for the corresponding response.
657
//
658
// NOTE: This is part of the RemoteSignerRequests interface.
659
func (s *SignCoordinator) MuSig2CombineSig(_ context.Context,
660
        in *signrpc.MuSig2CombineSigRequest,
661
        _ ...grpc.CallOption) (*signrpc.MuSig2CombineSigResponse, error) {
3✔
662

3✔
663
        req := &walletrpc.SignCoordinatorRequest_MuSig2CombineSigRequest{
3✔
664
                MuSig2CombineSigRequest: in,
3✔
665
        }
3✔
666

3✔
667
        return processRequest(
3✔
668
                s, noTimeout,
3✔
669
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
670
                        return walletrpc.SignCoordinatorRequest{
3✔
671
                                RequestId:       reqId,
3✔
672
                                SignRequestType: req,
3✔
673
                        }
3✔
674
                },
3✔
675
                func(resp *signerResponse) *signrpc.MuSig2CombineSigResponse {
3✔
676
                        return resp.GetMuSig2CombineSigResponse()
3✔
677
                },
3✔
678
        )
679
}
680

681
// MuSig2CreateSession sends a MuSig2SessionRequest to the remote signer and
682
// waits for the corresponding response.
683
//
684
// NOTE: This is part of the RemoteSignerRequests interface.
685
func (s *SignCoordinator) MuSig2CreateSession(_ context.Context,
686
        in *signrpc.MuSig2SessionRequest,
687
        _ ...grpc.CallOption) (*signrpc.MuSig2SessionResponse, error) {
3✔
688

3✔
689
        req := &walletrpc.SignCoordinatorRequest_MuSig2SessionRequest{
3✔
690
                MuSig2SessionRequest: in,
3✔
691
        }
3✔
692

3✔
693
        return processRequest(
3✔
694
                s, noTimeout,
3✔
695
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
696
                        return walletrpc.SignCoordinatorRequest{
3✔
697
                                RequestId:       reqId,
3✔
698
                                SignRequestType: req,
3✔
699
                        }
3✔
700
                },
3✔
701
                func(resp *signerResponse) *signrpc.MuSig2SessionResponse {
3✔
702
                        return resp.GetMuSig2SessionResponse()
3✔
703
                },
3✔
704
        )
705
}
706

707
// MuSig2RegisterNonces sends a MuSig2RegisterNoncesRequest to the remote signer
708
// and waits for the corresponding response.
709
//
710
// NOTE: This is part of the RemoteSignerRequests interface.
711
func (s *SignCoordinator) MuSig2RegisterNonces(_ context.Context,
712
        in *signrpc.MuSig2RegisterNoncesRequest,
713
        _ ...grpc.CallOption) (*signrpc.MuSig2RegisterNoncesResponse,
714
        error) {
3✔
715

3✔
716
        req := &walletrpc.SignCoordinatorRequest_MuSig2RegisterNoncesRequest{
3✔
717
                MuSig2RegisterNoncesRequest: in,
3✔
718
        }
3✔
719

3✔
720
        type muSig2RegisterNoncesResp = *signrpc.MuSig2RegisterNoncesResponse
3✔
721

3✔
722
        return processRequest(
3✔
723
                s, noTimeout,
3✔
724
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
725
                        return walletrpc.SignCoordinatorRequest{
3✔
726
                                RequestId:       reqId,
3✔
727
                                SignRequestType: req,
3✔
728
                        }
3✔
729
                },
3✔
730
                func(resp *signerResponse) muSig2RegisterNoncesResp {
3✔
731
                        return resp.GetMuSig2RegisterNoncesResponse()
3✔
732
                },
3✔
733
        )
734
}
735

736
// MuSig2Sign sends a MuSig2SignRequest to the remote signer and waits for the
737
// corresponding response.
738
//
739
// NOTE: This is part of the RemoteSignerRequests interface.
740
func (s *SignCoordinator) MuSig2Sign(_ context.Context,
741
        in *signrpc.MuSig2SignRequest,
742
        _ ...grpc.CallOption) (*signrpc.MuSig2SignResponse, error) {
3✔
743

3✔
744
        req := &walletrpc.SignCoordinatorRequest_MuSig2SignRequest{
3✔
745
                MuSig2SignRequest: in,
3✔
746
        }
3✔
747

3✔
748
        return processRequest(
3✔
749
                s, noTimeout,
3✔
750
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
751
                        return walletrpc.SignCoordinatorRequest{
3✔
752
                                RequestId:       reqId,
3✔
753
                                SignRequestType: req,
3✔
754
                        }
3✔
755
                },
3✔
756
                func(resp *signerResponse) *signrpc.MuSig2SignResponse {
3✔
757
                        return resp.GetMuSig2SignResponse()
3✔
758
                },
3✔
759
        )
760
}
761

762
// SignMessage sends a SignMessageReq to the remote signer and waits for the
763
// corresponding response.
764
//
765
// NOTE: This is part of the RemoteSignerRequests interface.
766
func (s *SignCoordinator) SignMessage(_ context.Context,
767
        in *signrpc.SignMessageReq,
768
        _ ...grpc.CallOption) (*signrpc.SignMessageResp, error) {
3✔
769

3✔
770
        req := &walletrpc.SignCoordinatorRequest_SignMessageReq{
3✔
771
                SignMessageReq: in,
3✔
772
        }
3✔
773

3✔
774
        return processRequest(
3✔
775
                s, noTimeout,
3✔
776
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
777
                        return walletrpc.SignCoordinatorRequest{
3✔
778
                                RequestId:       reqId,
3✔
779
                                SignRequestType: req,
3✔
780
                        }
3✔
781
                },
3✔
782
                func(resp *signerResponse) *signrpc.SignMessageResp {
3✔
783
                        return resp.GetSignMessageResp()
3✔
784
                },
3✔
785
        )
786
}
787

788
// SignPsbt sends a SignPsbtRequest to the remote signer and waits for the
789
// corresponding response.
790
//
791
// NOTE: This is part of the RemoteSignerRequests interface.
792
func (s *SignCoordinator) SignPsbt(_ context.Context,
793
        in *walletrpc.SignPsbtRequest,
794
        _ ...grpc.CallOption) (*walletrpc.SignPsbtResponse, error) {
3✔
795

3✔
796
        req := &walletrpc.SignCoordinatorRequest_SignPsbtRequest{
3✔
797
                SignPsbtRequest: in,
3✔
798
        }
3✔
799

3✔
800
        return processRequest(
3✔
801
                s, noTimeout,
3✔
802
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
6✔
803
                        return walletrpc.SignCoordinatorRequest{
3✔
804
                                RequestId:       reqId,
3✔
805
                                SignRequestType: req,
3✔
806
                        }
3✔
807
                },
3✔
808
                func(resp *signerResponse) *walletrpc.SignPsbtResponse {
3✔
809
                        return resp.GetSignPsbtResponse()
3✔
810
                },
3✔
811
        )
812
}
813

814
// processRequest is a generic function that sends a request to the remote
815
// signer and waits for the corresponding response. If a timeout is set, the
816
// function will limit the execution time of the entire function to the
817
// specified timeout. If it is not set, configured timeouts will be used for
818
// the individual operations within the function.
819
func processRequest[R comparable](s *SignCoordinator, timeout time.Duration,
820
        generateRequest func(uint64) walletrpc.SignCoordinatorRequest,
821
        extractResponse func(*signerResponse) R) (R, error) {
15✔
822

15✔
823
        var zero R
15✔
824

15✔
825
        done, err := s.registerRequest()
15✔
826
        if err != nil {
15✔
NEW
827
                return zero, err
×
NEW
828
        }
×
829
        defer done()
15✔
830

15✔
831
        startTime := time.Now()
15✔
832

15✔
833
        // If a timeout is enforced, we will wait for the connection using the
15✔
834
        // specified timeout. Otherwise, we will wait for the connection using
15✔
835
        // the configured connection timeout.
15✔
836
        if timeout != 0 {
30✔
837
                err = s.waitUntilConnectedWithTimeout(timeout)
15✔
838
        } else {
18✔
839
                err = s.WaitUntilConnected()
3✔
840
        }
3✔
841

842
        if err != nil {
15✔
NEW
843
                return zero, err
×
NEW
844
        }
×
845

846
        reqID := s.nextRequestID.Add(1)
15✔
847
        req := generateRequest(reqID)
15✔
848

15✔
849
        cleanUpChannel := s.createResponseChannel(reqID)
15✔
850
        defer cleanUpChannel()
15✔
851

15✔
852
        log.Debugf("Sending a %T to the remote signer with request ID %d",
15✔
853
                req.SignRequestType, reqID)
15✔
854

15✔
855
        log.Tracef("Request content: %v", formatSignCoordinatorMsg(&req))
15✔
856

15✔
857
        err = s.stream.Send(&req)
15✔
858
        if err != nil {
15✔
NEW
859
                return zero, err
×
NEW
860
        }
×
861

862
        var resp *walletrpc.SignCoordinatorResponse
15✔
863

15✔
864
        // If a timeout is enforced, we need to limit the entire execution time
15✔
865
        // of this function to the timeout. Therefore, we need to calculate the
15✔
866
        // remaining allowed execution time.
15✔
867
        // If no timeout is enforced, we will wait for the response using the
15✔
868
        // configured request timeout.
15✔
869
        if timeout != 0 {
30✔
870
                newTimeout := timeout - time.Since(startTime)
15✔
871

15✔
872
                if time.Since(startTime) > timeout {
15✔
NEW
873
                        return zero, ErrRequestTimeout
×
NEW
874
                }
×
875

876
                resp, err = s.getResponse(reqID, newTimeout)
15✔
877
        } else {
3✔
878
                resp, err = s.getResponse(reqID, s.requestTimeout)
3✔
879
        }
3✔
880

881
        if err != nil {
24✔
882
                return zero, err
9✔
883
        }
9✔
884

885
        rpcResp := extractResponse(resp)
9✔
886
        if rpcResp == zero {
9✔
NEW
887
                return zero, ErrUnexpectedResponse
×
NEW
888
        }
×
889

890
        return rpcResp, nil
9✔
891
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc