• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11963610117

21 Nov 2024 11:38PM UTC coverage: 59.117% (+0.1%) from 58.98%
11963610117

Pull #8754

github

ViktorTigerstrom
itest: wrap deriveCustomScopeAccounts at 80 chars

This commit fixes that word wrapping for the deriveCustomScopeAccounts
function docs, and ensures that it wraps at 80 characters or less.
Pull Request #8754: Add `Outbound` Remote Signer implementation

1950 of 2984 new or added lines in 44 files covered. (65.35%)

200 existing lines in 39 files now uncovered.

134504 of 227522 relevant lines covered (59.12%)

19449.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.2
/lnwallet/rpcwallet/sign_coordinator.go
1
package rpcwallet
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/lnrpc/signrpc"
12
        "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
13
        "github.com/lightningnetwork/lnd/lnutils"
14
        "google.golang.org/grpc"
15
)
16

17
var (
18
        // ErrRequestTimeout is the error that's returned if we time out while
19
        // waiting for a response from the remote signer.
20
        ErrRequestTimeout = errors.New("remote signer response timeout reached")
21

22
        // ErrConnectTimeout is the error that's returned if we time out while
23
        // waiting for the remote signer to connect.
24
        ErrConnectTimeout = errors.New("timed out when waiting for remote " +
25
                "signer to connect")
26

27
        // ErrMultipleConnections is the error that's returned if another
28
        // remote signer attempts to connect while we already have one
29
        // connected.
30
        ErrMultipleConnections = errors.New("only one remote signer can be " +
31
                "connected")
32

33
        // ErrNotConnected is the error that's returned if the remote signer
34
        // closes the stream or we encounter an error when receiving over the
35
        // stream.
36
        ErrNotConnected = errors.New("the remote signer is no longer connected")
37

38
        // ErrUnexpectedResponse is the error that's returned if the response
39
        // with the expected request ID from the remote signer is of an
40
        // unexpected type.
41
        ErrUnexpectedResponse = errors.New("unexpected response type")
42
)
43

44
const (
45
        // noTimeout is a constant that can be used to indicate that no timeout
46
        // should be enforced when waiting for a response from the remote
47
        // signer.
48
        noTimeout = 0
49
)
50

51
// SignCoordinator is an implementation of the signrpc.SignerClient and the
52
// walletrpc.WalletKitClient interfaces that passes on all requests to a remote
53
// signer. It is used by the watch-only wallet to delegate any signing or ECDH
54
// operations to a remote node over a
55
// walletrpc.WalletKit_SignCoordinatorStreamsServer stream. The stream is set up
56
// by the remote signer when it connects to the watch-only wallet, which should
57
// execute the Run method.
58
type SignCoordinator struct {
59
        // nextRequestID keeps track of the next request ID that should
60
        // be used when sending a request to the remote signer.
61
        nextRequestID atomic.Uint64
62

63
        // stream is a bi-directional stream between us and the remote signer.
64
        stream StreamServer
65

66
        // responses is a map of request IDs to response channels. This map
67
        // should be populated with a response channel for each request that has
68
        // been sent to the remote signer. The response channel should be
69
        // inserted into the map before the request is sent.
70
        // Any response received over the stream that does not have an
71
        // associated response channel in this map is ignored.
72
        // The response channel should be removed from the map when the response
73
        // has been received and processed.
74
        responses *lnutils.SyncMap[uint64, chan *signerResponse]
75

76
        // receiveErrChan is used to signal that the stream with the remote
77
        // signer has errored, and we can no longer process responses.
78
        receiveErrChan chan error
79

80
        // doneReceiving is closed when either party terminates and signals to
81
        // any pending requests that we'll no longer process the response for
82
        // that request.
83
        doneReceiving chan struct{}
84

85
        // quit is closed when lnd is shutting down.
86
        quit chan struct{}
87

88
        // clientConnected is sent over when the remote signer connects.
89
        clientConnected chan struct{}
90

91
        // requestTimeout is the maximum time we will wait for a response from
92
        // the remote signer.
93
        requestTimeout time.Duration
94

95
        // connectionTimeout is the maximum time we will wait for the remote
96
        // signer to connect.
97
        connectionTimeout time.Duration
98

99
        mu sync.Mutex
100

101
        wg sync.WaitGroup
102
}
103

104
// A compile time assertion to ensure SignCoordinator meets the
105
// RemoteSignerRequests interface.
106
var _ RemoteSignerRequests = (*SignCoordinator)(nil)
107

108
// NewSignCoordinator creates a new instance of the SignCoordinator.
109
func NewSignCoordinator(requestTimeout time.Duration,
110
        connectionTimeout time.Duration) *SignCoordinator {
12✔
111

12✔
112
        respsMap := &lnutils.SyncMap[uint64, chan *signerResponse]{}
12✔
113

12✔
114
        s := &SignCoordinator{
12✔
115
                responses:         respsMap,
12✔
116
                receiveErrChan:    make(chan error, 1),
12✔
117
                doneReceiving:     make(chan struct{}),
12✔
118
                clientConnected:   make(chan struct{}),
12✔
119
                quit:              make(chan struct{}),
12✔
120
                requestTimeout:    requestTimeout,
12✔
121
                connectionTimeout: connectionTimeout,
12✔
122
        }
12✔
123

12✔
124
        // We initialize the atomic nextRequestID to the handshakeRequestID, as
12✔
125
        // requestID 1 is reserved for the initial handshake by the remote
12✔
126
        // signer.
12✔
127
        s.nextRequestID.Store(handshakeRequestID)
12✔
128

12✔
129
        return s
12✔
130
}
12✔
131

132
// Run starts the SignCoordinator and blocks until the remote signer
133
// disconnects, the SignCoordinator is shut down, or an error occurs.
134
func (s *SignCoordinator) Run(stream StreamServer) error {
12✔
135
        s.mu.Lock()
12✔
136

12✔
137
        select {
12✔
NEW
138
        case <-s.quit:
×
NEW
139
                s.mu.Unlock()
×
NEW
140
                return ErrShuttingDown
×
141

NEW
142
        case <-s.doneReceiving:
×
NEW
143
                s.mu.Unlock()
×
NEW
144
                return ErrNotConnected
×
145

146
        default:
12✔
147
        }
148

149
        s.wg.Add(1)
12✔
150
        defer s.wg.Done()
12✔
151

12✔
152
        // If we already have a stream, we error out as we can only have one
12✔
153
        // connection throughout the lifetime of the SignCoordinator.
12✔
154
        if s.stream != nil {
12✔
NEW
155
                s.mu.Unlock()
×
NEW
156
                return ErrMultipleConnections
×
NEW
157
        }
×
158

159
        s.stream = stream
12✔
160

12✔
161
        s.mu.Unlock()
12✔
162

12✔
163
        // The handshake must be completed before we can start sending requests
12✔
164
        // to the remote signer.
12✔
165
        err := s.handshake(stream)
12✔
166
        if err != nil {
12✔
NEW
167
                return err
×
NEW
168
        }
×
169

170
        log.Infof("Remote signer connected")
12✔
171
        close(s.clientConnected)
12✔
172

12✔
173
        // Now let's start the main receiving loop, which will receive all
12✔
174
        // responses to our requests from the remote signer!
12✔
175
        // We start the receiving loop in a goroutine to ensure that this
12✔
176
        // function exits if the SignCoordinator is shut down (i.e. the s.quit
12✔
177
        // channel is closed). Returning from this function will cause the
12✔
178
        // stream to be closed, which in turn will cause the receiving loop to
12✔
179
        // exit.
12✔
180
        s.wg.Add(1)
12✔
181
        go s.StartReceiving()
12✔
182

12✔
183
        select {
12✔
184
        case err := <-s.receiveErrChan:
4✔
185
                return err
4✔
186

187
        case <-s.quit:
5✔
188
                return ErrShuttingDown
5✔
189

NEW
190
        case <-s.doneReceiving:
×
NEW
191
                return ErrNotConnected
×
192
        }
193
}
194

195
// Stop shuts down the SignCoordinator and waits until the main receiving loop
196
// has exited and all pending requests have been terminated.
197
func (s *SignCoordinator) Stop() {
5✔
198
        log.Infof("Stopping Sign Coordinator")
5✔
199
        defer log.Debugf("Sign coordinator stopped")
5✔
200

5✔
201
        // We lock the mutex before closing the quit channel to ensure that we
5✔
202
        // can't get a concurrent request into the SignCoordinator while we're
5✔
203
        // stopping it. That will ensure that the s.wg.Wait() call below will
5✔
204
        // always wait for any ongoing requests to finish before we return.
5✔
205
        s.mu.Lock()
5✔
206

5✔
207
        close(s.quit)
5✔
208

5✔
209
        s.mu.Unlock()
5✔
210

5✔
211
        s.wg.Wait()
5✔
212
}
5✔
213

214
// handshake performs the initial handshake with the remote signer. This must
215
// be done before any other requests are sent to the remote signer.
216
func (s *SignCoordinator) handshake(stream StreamServer) error {
12✔
217
        var (
12✔
218
                registerChan     = make(chan *walletrpc.SignerRegistration)
12✔
219
                registerDoneChan = make(chan struct{})
12✔
220
                errChan          = make(chan error)
12✔
221
        )
12✔
222

12✔
223
        // Create a context with a timeout using the context from the stream as
12✔
224
        // the parent context. This ensures that we'll exit if either the stream
12✔
225
        // is closed by the remote signer or if we time out.
12✔
226
        ctxt, cancel := context.WithTimeout(
12✔
227
                stream.Context(), s.requestTimeout,
12✔
228
        )
12✔
229
        defer cancel()
12✔
230

12✔
231
        // Read the first message in a goroutine because the Recv method blocks
12✔
232
        // until the message arrives.
12✔
233
        s.wg.Add(1)
12✔
234
        go func() {
24✔
235
                defer s.wg.Done()
12✔
236

12✔
237
                msg, err := stream.Recv()
12✔
238
                if err != nil {
12✔
NEW
239
                        select {
×
NEW
240
                        case errChan <- err:
×
NEW
241
                        case <-ctxt.Done():
×
242
                        }
243

NEW
244
                        return
×
245
                }
246

247
                if msg.GetRefRequestId() != handshakeRequestID {
12✔
NEW
248
                        err = fmt.Errorf("initial request ID must be %d, "+
×
NEW
249
                                "but is: %d", handshakeRequestID,
×
NEW
250
                                msg.GetRefRequestId())
×
NEW
251

×
NEW
252
                        select {
×
NEW
253
                        case errChan <- err:
×
NEW
254
                        case <-ctxt.Done():
×
255
                        }
256

NEW
257
                        return
×
258
                }
259

260
                switch req := msg.GetSignResponseType().(type) {
12✔
261
                case *walletrpc.SignCoordinatorResponse_SignerRegistration:
12✔
262
                        select {
12✔
263
                        case registerChan <- req.SignerRegistration:
12✔
NEW
264
                        case <-ctxt.Done():
×
265
                        }
266

267
                        return
12✔
268

NEW
269
                default:
×
NEW
270
                        err := fmt.Errorf("expected registration message, "+
×
NEW
271
                                "but got: %T", req)
×
NEW
272

×
NEW
273
                        select {
×
NEW
274
                        case errChan <- err:
×
NEW
275
                        case <-ctxt.Done():
×
276
                        }
277

NEW
278
                        return
×
279
                }
280
        }()
281

282
        // Wait for the initial message to arrive or time out if it takes too
283
        // long. The initial message must be a registration message from the
284
        // remote signer.
285
        select {
12✔
286
        case signerRegistration := <-registerChan:
12✔
287
                // TODO(viktor): This could be extended to validate the version
12✔
288
                // of the remote signer in the future.
12✔
289
                if signerRegistration.GetRegistrationInfo() == "" {
12✔
NEW
290
                        return errors.New("invalid remote signer " +
×
NEW
291
                                "registration info")
×
NEW
292
                }
×
293

294
                // Todo(viktor): The RegistrationChallenge in the
295
                // signerRegistration should likely also be signed here.
296

NEW
297
        case err := <-errChan:
×
NEW
298
                return fmt.Errorf("error receiving initial remote signer "+
×
NEW
299
                        "registration message: %v", err)
×
300

NEW
301
        case <-s.quit:
×
NEW
302
                return ErrShuttingDown
×
303

NEW
304
        case <-ctxt.Done():
×
NEW
305
                return ctxt.Err()
×
306
        }
307

308
        complete := &walletrpc.RegistrationResponse_RegistrationComplete{
12✔
309
                // TODO(viktor): The signature should be generated by signing
12✔
310
                // the RegistrationChallenge contained in the SignerRegistration
12✔
311
                // message in the future.
12✔
312
                // The RegistrationInfo could also be extended to include info
12✔
313
                // about the watch-only node in the future.
12✔
314
                RegistrationComplete: &walletrpc.RegistrationComplete{
12✔
315
                        Signature:        "",
12✔
316
                        RegistrationInfo: "outboundWatchOnly",
12✔
317
                },
12✔
318
        }
12✔
319
        // Send a message to the client to indicate that the registration has
12✔
320
        // successfully completed.
12✔
321
        req := &walletrpc.SignCoordinatorRequest_RegistrationResponse{
12✔
322
                RegistrationResponse: &walletrpc.RegistrationResponse{
12✔
323
                        RegistrationResponseType: complete,
12✔
324
                },
12✔
325
        }
12✔
326

12✔
327
        regCompleteMsg := &walletrpc.SignCoordinatorRequest{
12✔
328
                RequestId:       handshakeRequestID,
12✔
329
                SignRequestType: req,
12✔
330
        }
12✔
331

12✔
332
        // Send the message in a goroutine because the Send method blocks until
12✔
333
        // the message is read by the client.
12✔
334
        s.wg.Add(1)
12✔
335
        go func() {
24✔
336
                defer s.wg.Done()
12✔
337

12✔
338
                err := stream.Send(regCompleteMsg)
12✔
339
                if err != nil {
12✔
NEW
340
                        select {
×
NEW
341
                        case errChan <- err:
×
NEW
342
                        case <-ctxt.Done():
×
343
                        }
344

NEW
345
                        return
×
346
                }
347

348
                close(registerDoneChan)
12✔
349
        }()
350

351
        select {
12✔
NEW
352
        case err := <-errChan:
×
NEW
353
                return fmt.Errorf("error sending registration complete "+
×
NEW
354
                        " message to remote signer: %v", err)
×
355

NEW
356
        case <-ctxt.Done():
×
NEW
357
                return ctxt.Err()
×
358

NEW
359
        case <-s.quit:
×
NEW
360
                return ErrShuttingDown
×
361

362
        case <-registerDoneChan:
12✔
363
        }
364

365
        return nil
12✔
366
}
367

368
// StartReceiving is the main receive loop that receives responses from the
369
// remote signer. Responses must have a RequestID that corresponds to requests
370
// which are waiting for a response; otherwise, the response is ignored.
371
func (s *SignCoordinator) StartReceiving() {
12✔
372
        defer s.wg.Done()
12✔
373

12✔
374
        // Signals to any ongoing requests that the remote signer is no longer
12✔
375
        // connected.
12✔
376
        defer close(s.doneReceiving)
12✔
377

12✔
378
        for {
33✔
379
                resp, err := s.stream.Recv()
21✔
380
                if err != nil {
27✔
381
                        select {
6✔
382
                        // If we've already shut down, the main Run method will
383
                        // not be able to receive any error sent over the error
384
                        // channel. So we just return.
385
                        case <-s.quit:
5✔
386

387
                        // Send the error over the error channel, so that the
388
                        // main Run method can return the error.
389
                        case s.receiveErrChan <- err:
5✔
390
                        }
391

392
                        return
6✔
393
                }
394

395
                respChan, ok := s.responses.Load(resp.GetRefRequestId())
13✔
396

13✔
397
                if ok {
24✔
398
                        select {
11✔
399
                        // We should always be able to send over the response
400
                        // channel, as the channel allows for a buffer of 1, and
401
                        // we shouldn't have multiple requests and responses for
402
                        // the same request ID.
403
                        case respChan <- resp:
11✔
404

NEW
405
                        case <-s.quit:
×
NEW
406
                                return
×
407

408
                        // The timeout case be unreachable, as we should always
409
                        // be able to send 1 response over the response channel.
410
                        // We keep this case just to avoid a scenario where the
411
                        // receive loop would be blocked if we receive multiple
412
                        // responses for the same request ID.
NEW
413
                        case <-time.After(s.requestTimeout):
×
414
                        }
415
                }
416

417
                // If there's no response channel, the thread waiting for the
418
                // response has most likely timed out. We therefore ignore the
419
                // response. The other scenario where we don't have a response
420
                // channel would be if we received a response for a request that
421
                // we didn't send. This should never happen, but if it does, we
422
                // ignore the response.
423

424
                select {
13✔
NEW
425
                case <-s.quit:
×
NEW
426
                        return
×
427
                default:
13✔
428
                }
429
        }
430
}
431

432
// WaitUntilConnected waits until the remote signer has connected. If the remote
433
// signer does not connect within the configured connection timeout, an error is
434
// returned.
435
func (s *SignCoordinator) WaitUntilConnected() error {
4✔
436
        return s.waitUntilConnectedWithTimeout(s.connectionTimeout)
4✔
437
}
4✔
438

439
// waitUntilConnectedWithTimeout waits until the remote signer has connected. If
440
// the remote signer does not connect within the given timeout, an error is
441
// returned.
442
func (s *SignCoordinator) waitUntilConnectedWithTimeout(
443
        timeout time.Duration) error {
16✔
444

16✔
445
        select {
16✔
446
        case <-s.clientConnected:
16✔
447
                return nil
16✔
448

NEW
449
        case <-s.quit:
×
NEW
450
                return ErrShuttingDown
×
451

NEW
452
        case <-time.After(timeout):
×
NEW
453
                return ErrConnectTimeout
×
454

455
        case <-s.doneReceiving:
1✔
456
                return ErrNotConnected
1✔
457
        }
458
}
459

460
// createResponseChannel creates a response channel for the given request ID and
461
// inserts it into the responses map. The function returns a cleanup function
462
// which removes the channel from the responses map, and the caller must ensure
463
// that this cleanup function is executed once the thread that's waiting for
464
// the response is done.
465
func (s *SignCoordinator) createResponseChannel(requestID uint64) func() {
16✔
466
        // Create a new response channel.
16✔
467
        respChan := make(chan *signerResponse, 1)
16✔
468

16✔
469
        // Insert the response channel into the map.
16✔
470
        s.responses.Store(requestID, respChan)
16✔
471

16✔
472
        // Create a cleanup function that will delete the response channel.
16✔
473
        return func() {
32✔
474
                select {
16✔
475
                // If we have timed out, there could be a very unlikely
476
                // scenario where we did receive a response before we managed to
477
                // grab the lock in the cleanup func. In that case, we'll just
478
                // ignore the response. We should still clean up the response
479
                // channel though.
NEW
480
                case <-respChan:
×
481
                default:
16✔
482
                }
483

484
                s.responses.Delete(requestID)
16✔
485
        }
486
}
487

488
// getResponse waits for a response with the given request ID and returns the
489
// response if it is received. If the corresponding response from the remote
490
// signer is a SignerError, the error message is returned. If the response is
491
// not received within the given timeout, an error is returned.
492
//
493
// Note: Before calling this function, the caller must have created a response
494
// channel for the request ID.
495
func (s *SignCoordinator) getResponse(requestID uint64,
496
        timeout time.Duration) (*signerResponse, error) {
16✔
497

16✔
498
        respChan, ok := s.responses.Load(requestID)
16✔
499

16✔
500
        // Verify that we have a response channel for the request ID.
16✔
501
        if !ok {
16✔
NEW
502
                // It should be impossible to reach this case, as we create the
×
NEW
503
                // response channel before sending the request.
×
NEW
504
                return nil, fmt.Errorf("no response channel found for "+
×
NEW
505
                        "request ID %d", requestID)
×
NEW
506
        }
×
507

508
        // Wait for the response to arrive.
509
        select {
16✔
510
        case resp, ok := <-respChan:
11✔
511
                if !ok {
11✔
NEW
512
                        // If the response channel was closed, we return an
×
NEW
513
                        // error as the receiving thread must have timed out
×
NEW
514
                        // before we managed to grab the response.
×
NEW
515
                        return nil, ErrRequestTimeout
×
NEW
516
                }
×
517

518
                // a temp type alias to limit the length of the line below.
519
                type sErr = walletrpc.SignCoordinatorResponse_SignerError
11✔
520

11✔
521
                // If the response is an error, we return the error message.
11✔
522
                if errorResp, ok := resp.GetSignResponseType().(*sErr); ok {
16✔
523
                        errStr := errorResp.SignerError.GetError()
5✔
524

5✔
525
                        log.Debugf("Received an error response from remote "+
5✔
526
                                "signer for request ID %d. Error: %v",
5✔
527
                                requestID, errStr)
5✔
528

5✔
529
                        return nil, errors.New(errStr)
5✔
530
                }
5✔
531

532
                log.Debugf("Received remote signer %T response for request "+
10✔
533
                        "ID %d", resp.GetSignResponseType(), requestID)
10✔
534

10✔
535
                log.Tracef("Remote signer response content: %v",
10✔
536
                        formatSignCoordinatorMsg(resp))
10✔
537

10✔
538
                return resp, nil
10✔
539

540
        case <-s.doneReceiving:
1✔
541
                log.Debugf("Stopped waiting for remote signer response for "+
1✔
542
                        "request ID %d as the stream has been closed",
1✔
543
                        requestID)
1✔
544

1✔
545
                return nil, ErrNotConnected
1✔
546

547
        case <-s.quit:
1✔
548
                log.Debugf("Stopped waiting for remote signer response for "+
1✔
549
                        "request ID %d as we're shutting down", requestID)
1✔
550

1✔
551
                return nil, ErrShuttingDown
1✔
552

553
        case <-time.After(timeout):
3✔
554
                log.Debugf("Remote signer response timed out for request ID %d",
3✔
555
                        requestID)
3✔
556

3✔
557
                return nil, ErrRequestTimeout
3✔
558
        }
559
}
560

561
// registerRequest registers a new request with the SignCoordinator, ensuring it
562
// awaits the handling of the request before shutting down. The function returns
563
// a Done function that must be executed once the request has been handled.
564
func (s *SignCoordinator) registerRequest() (func(), error) {
16✔
565
        // We lock the mutex to ensure that we can't have a race where we'd
16✔
566
        // register a request while shutting down.
16✔
567
        s.mu.Lock()
16✔
568
        defer s.mu.Unlock()
16✔
569

16✔
570
        select {
16✔
NEW
571
        case <-s.quit:
×
NEW
572
                return nil, ErrShuttingDown
×
573
        default:
16✔
574
        }
575

576
        s.wg.Add(1)
16✔
577

16✔
578
        return func() {
32✔
579
                s.wg.Done()
16✔
580
        }, nil
16✔
581
}
582

583
// Ping sends a ping request to the remote signer and waits for a pong response.
584
func (s *SignCoordinator) Ping(timeout time.Duration) (bool, error) {
16✔
585
        req := &walletrpc.SignCoordinatorRequest_Ping{
16✔
586
                Ping: true,
16✔
587
        }
16✔
588

16✔
589
        return processRequest(
16✔
590
                s, timeout, // As we're pinging, we specify a time limit.
16✔
591
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
32✔
592
                        return walletrpc.SignCoordinatorRequest{
16✔
593
                                RequestId:       reqId,
16✔
594
                                SignRequestType: req,
16✔
595
                        }
16✔
596
                },
16✔
597
                func(resp *signerResponse) bool {
10✔
598
                        return resp.GetPong()
10✔
599
                },
10✔
600
        )
601
}
602

603
// DeriveSharedKey sends a SharedKeyRequest to the remote signer and waits for
604
// the corresponding response.
605
//
606
// NOTE: This is part of the RemoteSignerRequests interface.
607
func (s *SignCoordinator) DeriveSharedKey(_ context.Context,
608
        in *signrpc.SharedKeyRequest,
609
        _ ...grpc.CallOption) (*signrpc.SharedKeyResponse, error) {
4✔
610

4✔
611
        req := &walletrpc.SignCoordinatorRequest_SharedKeyRequest{
4✔
612
                SharedKeyRequest: in,
4✔
613
        }
4✔
614

4✔
615
        return processRequest(
4✔
616
                s, noTimeout,
4✔
617
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
8✔
618
                        return walletrpc.SignCoordinatorRequest{
4✔
619
                                RequestId:       reqId,
4✔
620
                                SignRequestType: req,
4✔
621
                        }
4✔
622
                },
4✔
623
                func(resp *signerResponse) *signrpc.SharedKeyResponse {
4✔
624
                        return resp.GetSharedKeyResponse()
4✔
625
                },
4✔
626
        )
627
}
628

629
// MuSig2Cleanup sends a MuSig2CleanupRequest to the remote signer and waits for
630
// the corresponding response.
631
//
632
// NOTE: This is part of the RemoteSignerRequests interface.
633
func (s *SignCoordinator) MuSig2Cleanup(_ context.Context,
634
        in *signrpc.MuSig2CleanupRequest,
635
        _ ...grpc.CallOption) (*signrpc.MuSig2CleanupResponse, error) {
4✔
636

4✔
637
        req := &walletrpc.SignCoordinatorRequest_MuSig2CleanupRequest{
4✔
638
                MuSig2CleanupRequest: in,
4✔
639
        }
4✔
640

4✔
641
        return processRequest(
4✔
642
                s, noTimeout,
4✔
643
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
8✔
644
                        return walletrpc.SignCoordinatorRequest{
4✔
645
                                RequestId:       reqId,
4✔
646
                                SignRequestType: req,
4✔
647
                        }
4✔
648
                },
4✔
649
                func(resp *signerResponse) *signrpc.MuSig2CleanupResponse {
4✔
650
                        return resp.GetMuSig2CleanupResponse()
4✔
651
                },
4✔
652
        )
653
}
654

655
// MuSig2CombineSig sends a MuSig2CombineSigRequest to the remote signer and
656
// waits for the corresponding response.
657
//
658
// NOTE: This is part of the RemoteSignerRequests interface.
659
func (s *SignCoordinator) MuSig2CombineSig(_ context.Context,
660
        in *signrpc.MuSig2CombineSigRequest,
661
        _ ...grpc.CallOption) (*signrpc.MuSig2CombineSigResponse, error) {
4✔
662

4✔
663
        req := &walletrpc.SignCoordinatorRequest_MuSig2CombineSigRequest{
4✔
664
                MuSig2CombineSigRequest: in,
4✔
665
        }
4✔
666

4✔
667
        return processRequest(
4✔
668
                s, noTimeout,
4✔
669
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
8✔
670
                        return walletrpc.SignCoordinatorRequest{
4✔
671
                                RequestId:       reqId,
4✔
672
                                SignRequestType: req,
4✔
673
                        }
4✔
674
                },
4✔
675
                func(resp *signerResponse) *signrpc.MuSig2CombineSigResponse {
4✔
676
                        return resp.GetMuSig2CombineSigResponse()
4✔
677
                },
4✔
678
        )
679
}
680

681
// MuSig2CreateSession sends a MuSig2SessionRequest to the remote signer and
682
// waits for the corresponding response.
683
//
684
// NOTE: This is part of the RemoteSignerRequests interface.
685
func (s *SignCoordinator) MuSig2CreateSession(_ context.Context,
686
        in *signrpc.MuSig2SessionRequest,
687
        _ ...grpc.CallOption) (*signrpc.MuSig2SessionResponse, error) {
4✔
688

4✔
689
        req := &walletrpc.SignCoordinatorRequest_MuSig2SessionRequest{
4✔
690
                MuSig2SessionRequest: in,
4✔
691
        }
4✔
692

4✔
693
        return processRequest(
4✔
694
                s, noTimeout,
4✔
695
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
8✔
696
                        return walletrpc.SignCoordinatorRequest{
4✔
697
                                RequestId:       reqId,
4✔
698
                                SignRequestType: req,
4✔
699
                        }
4✔
700
                },
4✔
701
                func(resp *signerResponse) *signrpc.MuSig2SessionResponse {
4✔
702
                        return resp.GetMuSig2SessionResponse()
4✔
703
                },
4✔
704
        )
705
}
706

707
// MuSig2RegisterNonces sends a MuSig2RegisterNoncesRequest to the remote signer
708
// and waits for the corresponding response.
709
//
710
// NOTE: This is part of the RemoteSignerRequests interface.
711
func (s *SignCoordinator) MuSig2RegisterNonces(_ context.Context,
712
        in *signrpc.MuSig2RegisterNoncesRequest,
713
        _ ...grpc.CallOption) (*signrpc.MuSig2RegisterNoncesResponse,
714
        error) {
4✔
715

4✔
716
        req := &walletrpc.SignCoordinatorRequest_MuSig2RegisterNoncesRequest{
4✔
717
                MuSig2RegisterNoncesRequest: in,
4✔
718
        }
4✔
719

4✔
720
        type muSig2RegisterNoncesResp = *signrpc.MuSig2RegisterNoncesResponse
4✔
721

4✔
722
        return processRequest(
4✔
723
                s, noTimeout,
4✔
724
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
8✔
725
                        return walletrpc.SignCoordinatorRequest{
4✔
726
                                RequestId:       reqId,
4✔
727
                                SignRequestType: req,
4✔
728
                        }
4✔
729
                },
4✔
730
                func(resp *signerResponse) muSig2RegisterNoncesResp {
4✔
731
                        return resp.GetMuSig2RegisterNoncesResponse()
4✔
732
                },
4✔
733
        )
734
}
735

736
// MuSig2Sign sends a MuSig2SignRequest to the remote signer and waits for the
737
// corresponding response.
738
//
739
// NOTE: This is part of the RemoteSignerRequests interface.
740
func (s *SignCoordinator) MuSig2Sign(_ context.Context,
741
        in *signrpc.MuSig2SignRequest,
742
        _ ...grpc.CallOption) (*signrpc.MuSig2SignResponse, error) {
4✔
743

4✔
744
        req := &walletrpc.SignCoordinatorRequest_MuSig2SignRequest{
4✔
745
                MuSig2SignRequest: in,
4✔
746
        }
4✔
747

4✔
748
        return processRequest(
4✔
749
                s, noTimeout,
4✔
750
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
8✔
751
                        return walletrpc.SignCoordinatorRequest{
4✔
752
                                RequestId:       reqId,
4✔
753
                                SignRequestType: req,
4✔
754
                        }
4✔
755
                },
4✔
756
                func(resp *signerResponse) *signrpc.MuSig2SignResponse {
4✔
757
                        return resp.GetMuSig2SignResponse()
4✔
758
                },
4✔
759
        )
760
}
761

762
// SignMessage sends a SignMessageReq to the remote signer and waits for the
763
// corresponding response.
764
//
765
// NOTE: This is part of the RemoteSignerRequests interface.
766
func (s *SignCoordinator) SignMessage(_ context.Context,
767
        in *signrpc.SignMessageReq,
768
        _ ...grpc.CallOption) (*signrpc.SignMessageResp, error) {
4✔
769

4✔
770
        req := &walletrpc.SignCoordinatorRequest_SignMessageReq{
4✔
771
                SignMessageReq: in,
4✔
772
        }
4✔
773

4✔
774
        return processRequest(
4✔
775
                s, noTimeout,
4✔
776
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
8✔
777
                        return walletrpc.SignCoordinatorRequest{
4✔
778
                                RequestId:       reqId,
4✔
779
                                SignRequestType: req,
4✔
780
                        }
4✔
781
                },
4✔
782
                func(resp *signerResponse) *signrpc.SignMessageResp {
4✔
783
                        return resp.GetSignMessageResp()
4✔
784
                },
4✔
785
        )
786
}
787

788
// SignPsbt sends a SignPsbtRequest to the remote signer and waits for the
789
// corresponding response.
790
//
791
// NOTE: This is part of the RemoteSignerRequests interface.
792
func (s *SignCoordinator) SignPsbt(_ context.Context,
793
        in *walletrpc.SignPsbtRequest,
794
        _ ...grpc.CallOption) (*walletrpc.SignPsbtResponse, error) {
4✔
795

4✔
796
        req := &walletrpc.SignCoordinatorRequest_SignPsbtRequest{
4✔
797
                SignPsbtRequest: in,
4✔
798
        }
4✔
799

4✔
800
        return processRequest(
4✔
801
                s, noTimeout,
4✔
802
                func(reqId uint64) walletrpc.SignCoordinatorRequest {
8✔
803
                        return walletrpc.SignCoordinatorRequest{
4✔
804
                                RequestId:       reqId,
4✔
805
                                SignRequestType: req,
4✔
806
                        }
4✔
807
                },
4✔
808
                func(resp *signerResponse) *walletrpc.SignPsbtResponse {
4✔
809
                        return resp.GetSignPsbtResponse()
4✔
810
                },
4✔
811
        )
812
}
813

814
// processRequest is a generic function that sends a request to the remote
815
// signer and waits for the corresponding response. If a timeout is set, the
816
// function will limit the execution time of the entire function to the
817
// specified timeout. If it is not set, configured timeouts will be used for
818
// the individual operations within the function.
819
func processRequest[R comparable](s *SignCoordinator, timeout time.Duration,
820
        generateRequest func(uint64) walletrpc.SignCoordinatorRequest,
821
        extractResponse func(*signerResponse) R) (R, error) {
16✔
822

16✔
823
        var zero R
16✔
824

16✔
825
        done, err := s.registerRequest()
16✔
826
        if err != nil {
16✔
NEW
827
                return zero, err
×
NEW
828
        }
×
829
        defer done()
16✔
830

16✔
831
        startTime := time.Now()
16✔
832

16✔
833
        // If a timeout is enforced, we will wait for the connection using the
16✔
834
        // specified timeout. Otherwise, we will wait for the connection using
16✔
835
        // the configured connection timeout.
16✔
836
        if timeout != 0 {
32✔
837
                err = s.waitUntilConnectedWithTimeout(timeout)
16✔
838
        } else {
20✔
839
                err = s.WaitUntilConnected()
4✔
840
        }
4✔
841

842
        if err != nil {
17✔
843
                return zero, err
1✔
844
        }
1✔
845

846
        reqID := s.nextRequestID.Add(1)
16✔
847
        req := generateRequest(reqID)
16✔
848

16✔
849
        cleanUpChannel := s.createResponseChannel(reqID)
16✔
850
        defer cleanUpChannel()
16✔
851

16✔
852
        log.Debugf("Sending a %T to the remote signer with request ID %d",
16✔
853
                req.SignRequestType, reqID)
16✔
854

16✔
855
        log.Tracef("Request content: %v", formatSignCoordinatorMsg(&req))
16✔
856

16✔
857
        err = s.stream.Send(&req)
16✔
858
        if err != nil {
16✔
NEW
859
                return zero, err
×
NEW
860
        }
×
861

862
        var resp *walletrpc.SignCoordinatorResponse
16✔
863

16✔
864
        // If a timeout is enforced, we need to limit the entire execution time
16✔
865
        // of this function to the timeout. Therefore, we need to calculate the
16✔
866
        // remaining allowed execution time.
16✔
867
        // If no timeout is enforced, we will wait for the response using the
16✔
868
        // configured request timeout.
16✔
869
        if timeout != 0 {
32✔
870
                newTimeout := timeout - time.Since(startTime)
16✔
871

16✔
872
                if time.Since(startTime) > timeout {
16✔
NEW
873
                        return zero, ErrRequestTimeout
×
NEW
874
                }
×
875

876
                resp, err = s.getResponse(reqID, newTimeout)
16✔
877
        } else {
4✔
878
                resp, err = s.getResponse(reqID, s.requestTimeout)
4✔
879
        }
4✔
880

881
        if err != nil {
26✔
882
                return zero, err
10✔
883
        }
10✔
884

885
        rpcResp := extractResponse(resp)
10✔
886
        if rpcResp == zero {
10✔
NEW
887
                return zero, ErrUnexpectedResponse
×
NEW
888
        }
×
889

890
        return rpcResp, nil
10✔
891
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc