• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14000719599

21 Mar 2025 08:54PM UTC coverage: 58.717% (-10.3%) from 68.989%
14000719599

Pull #8754

github

web-flow
Merge 29f363f18 into 5235f3b24
Pull Request #8754: Add `Outbound` Remote Signer implementation

1562 of 2088 new or added lines in 41 files covered. (74.81%)

28126 existing lines in 464 files now uncovered.

97953 of 166822 relevant lines covered (58.72%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.07
/rpcperms/interceptor.go
1
package rpcperms
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btclog/v2"
11
        grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
12
        "github.com/lightningnetwork/lnd/lnrpc"
13
        "github.com/lightningnetwork/lnd/macaroons"
14
        "github.com/lightningnetwork/lnd/monitoring"
15
        "github.com/lightningnetwork/lnd/subscribe"
16
        "google.golang.org/grpc"
17
        "gopkg.in/macaroon-bakery.v2/bakery"
18
)
19

20
// rpcState is an enum that we use to keep track of the current RPC service
21
// state. This will transition as we go from startup to unlocking the wallet,
22
// and finally fully active.
23
type rpcState uint8
24

25
const (
26
        // waitingToStart indicates that we're at the beginning of the startup
27
        // process. In a cluster environment this may mean that we're waiting to
28
        // become the leader in which case RPC calls will be disabled until
29
        // this instance has been elected as leader.
30
        waitingToStart rpcState = iota
31

32
        // walletNotCreated is the starting state if the RPC server is active,
33
        // but the wallet is not yet created. In this state we'll only allow
34
        // calls to the WalletUnlockerService.
35
        walletNotCreated
36

37
        // walletLocked indicates the RPC server is active, but the wallet is
38
        // locked. In this state we'll only allow calls to the
39
        // WalletUnlockerService.
40
        walletLocked
41

42
        // walletUnlocked means that the wallet has been unlocked, but the full
43
        // RPC server is not yet ready.
44
        walletUnlocked
45

46
        // allowRemoteSigner means that the wallet is unlocked, and that we're
47
        // waiting for the remote signer to connect before proceeding. Only
48
        // rpc calls to connect the remote signer are allowed during this state.
49
        allowRemoteSigner
50

51
        // rpcActive means that the RPC server is ready to accept calls.
52
        rpcActive
53

54
        // serverActive means that the lnd server is ready to accept calls.
55
        serverActive
56
)
57

58
var (
59
        // ErrWaitingToStart is returned if LND is still waiting to start,
60
        // possibly blocked until elected as the leader.
61
        ErrWaitingToStart = fmt.Errorf("waiting to start, RPC services not " +
62
                "available")
63

64
        // ErrNoWallet is returned if the wallet does not exist.
65
        ErrNoWallet = fmt.Errorf("wallet not created, create one to enable " +
66
                "full RPC access")
67

68
        // ErrWalletLocked is returned if the wallet is locked and any service
69
        // other than the WalletUnlocker is called.
70
        ErrWalletLocked = fmt.Errorf("wallet locked, unlock it to enable " +
71
                "full RPC access")
72

73
        // ErrWalletUnlocked is returned if the WalletUnlocker service is
74
        // called when the wallet already has been unlocked.
75
        ErrWalletUnlocked = fmt.Errorf("wallet already unlocked, " +
76
                "WalletUnlocker service is no longer available")
77

78
        // ErrAwaitingRemoteSigner is returned if an RPC call is made, other
79
        // than an RPC call to connect a remote signer, while LND is waiting for
80
        // a remote signer to connect.
81
        ErrAwaitingRemoteSigner = fmt.Errorf("waiting for remote signer to " +
82
                "connect before other RPC calls can be accepted")
83

84
        // ErrRPCStarting is returned if the wallet has been unlocked but the
85
        // RPC server is not yet ready to accept calls.
86
        ErrRPCStarting = fmt.Errorf("the RPC server is in the process of " +
87
                "starting up, but not yet ready to accept calls")
88

89
        // macaroonWhitelist defines methods that we don't require macaroons to
90
        // access. We also allow these methods to be called even if not all
91
        // mandatory middlewares are registered yet. If the wallet is locked
92
        // then a middleware cannot register itself, creating an impossible
93
        // situation. Also, a middleware might want to check the state of lnd
94
        // by calling the State service before it registers itself. So we also
95
        // need to exclude those calls from the mandatory middleware check.
96
        macaroonWhitelist = map[string]struct{}{
97
                // We allow all calls to the WalletUnlocker without macaroons.
98
                "/lnrpc.WalletUnlocker/GenSeed":        {},
99
                "/lnrpc.WalletUnlocker/InitWallet":     {},
100
                "/lnrpc.WalletUnlocker/UnlockWallet":   {},
101
                "/lnrpc.WalletUnlocker/ChangePassword": {},
102

103
                // The State service must be available at all times, even
104
                // before we can check macaroons, so we whitelist it.
105
                "/lnrpc.State/SubscribeState": {},
106
                "/lnrpc.State/GetState":       {},
107
        }
108

109
        // allowRemoteSignerWhitelist defines methods that we allow to be called
110
        // when we are waiting for the remote signer to connect, i.e. in the
111
        // allowRemoteSigner state.
112
        allowRemoteSignerWhitelist = map[string]struct{}{
113
                "/walletrpc.WalletKit/SignCoordinatorStreams": {},
114
        }
115
)
116

117
// InterceptorChain is a struct that can be added to the running GRPC server,
118
// intercepting API calls. This is useful for logging, enforcing permissions,
119
// supporting middleware etc. The following diagram shows the order of each
120
// interceptor in the chain and when exactly requests/responses are intercepted
121
// and forwarded to external middleware for approval/modification. Middleware in
122
// general can only intercept gRPC requests/responses that are sent by the
123
// client with a macaroon that contains a custom caveat that is supported by one
124
// of the registered middlewares.
125
//
126
//            |
127
//            | gRPC request from client
128
//            |
129
//        +---v--------------------------------+
130
//        |   InterceptorChain                 |
131
//        +-+----------------------------------+
132
//          | Log Interceptor                  |
133
//          +----------------------------------+
134
//          | RPC State Interceptor            |
135
//          +----------------------------------+
136
//          | Macaroon Interceptor             |
137
//          +----------------------------------+--------> +---------------------+
138
//          | RPC Macaroon Middleware Handler  |<-------- | External Middleware |
139
//          +----------------------------------+          |   - modify request |
140
//          | Prometheus Interceptor           |          +---------------------+
141
//          +-+--------------------------------+
142
//            | validated gRPC request from client
143
//        +---v--------------------------------+
144
//        |   main gRPC server                 |
145
//        +---+--------------------------------+
146
//            |
147
//            | original gRPC request to client
148
//            |
149
//        +---v--------------------------------+--------> +---------------------+
150
//        |   RPC Macaroon Middleware Handler  |<-------- | External Middleware |
151
//        +---+--------------------------------+          |   - modify response |
152
//            |                                           +---------------------+
153
//            | edited gRPC request to client
154
//            v
155
type InterceptorChain struct {
156
        // lastRequestID is the ID of the last gRPC request or stream that was
157
        // intercepted by the middleware interceptor.
158
        //
159
        // NOTE: Must be used atomically!
160
        lastRequestID uint64
161

162
        // Required by the grpc-gateway/v2 library for forward compatibility.
163
        lnrpc.UnimplementedStateServer
164

165
        started sync.Once
166
        stopped sync.Once
167

168
        // state is the current RPC state of our RPC server.
169
        state rpcState
170

171
        // ntfnServer is a subscription server we use to notify clients of the
172
        // State service when the state changes.
173
        ntfnServer *subscribe.Server
174

175
        // noMacaroons should be set true if we don't want to check macaroons.
176
        noMacaroons bool
177

178
        // svc is the macaroon service used to enforce permissions in case
179
        // macaroons are used.
180
        svc *macaroons.Service
181

182
        // permissionMap is the permissions to enforce if macaroons are used.
183
        permissionMap map[string][]bakery.Op
184

185
        // rpcsLog is the logger used to log calls to the RPCs intercepted.
186
        rpcsLog btclog.Logger
187

188
        // registeredMiddleware is a slice of all macaroon permission based RPC
189
        // middleware clients that are currently registered. The
190
        // registeredMiddlewareNames can be used to find the index of a specific
191
        // interceptor within the registeredMiddleware slide using the name of
192
        // the interceptor as the key. The reason for using these two separate
193
        // structures is so that the order in which interceptors are run is
194
        // the same as the order in which they were registered.
195
        registeredMiddleware []*MiddlewareHandler
196

197
        // registeredMiddlewareNames is a map of registered middleware names
198
        // to the index at which they are stored in the registeredMiddleware
199
        // map.
200
        registeredMiddlewareNames map[string]int
201

202
        // mandatoryMiddleware is a list of all middleware that is considered to
203
        // be mandatory. If any of them is not registered then all RPC requests
204
        // (except for the macaroon white listed methods and the middleware
205
        // registration itself) are blocked. This is a security feature to make
206
        // sure that requests can't just go through unobserved/unaudited if a
207
        // middleware crashes.
208
        mandatoryMiddleware []string
209

210
        quit chan struct{}
211
        sync.RWMutex
212
}
213

214
// A compile time check to ensure that InterceptorChain fully implements the
215
// StateServer gRPC service.
216
var _ lnrpc.StateServer = (*InterceptorChain)(nil)
217

218
// NewInterceptorChain creates a new InterceptorChain.
219
func NewInterceptorChain(log btclog.Logger, noMacaroons bool,
220
        mandatoryMiddleware []string) *InterceptorChain {
3✔
221

3✔
222
        return &InterceptorChain{
3✔
223
                state:                     waitingToStart,
3✔
224
                ntfnServer:                subscribe.NewServer(),
3✔
225
                noMacaroons:               noMacaroons,
3✔
226
                permissionMap:             make(map[string][]bakery.Op),
3✔
227
                rpcsLog:                   log,
3✔
228
                registeredMiddlewareNames: make(map[string]int),
3✔
229
                mandatoryMiddleware:       mandatoryMiddleware,
3✔
230
                quit:                      make(chan struct{}),
3✔
231
        }
3✔
232
}
3✔
233

234
// Start starts the InterceptorChain, which is needed to start the state
235
// subscription server it powers.
236
func (r *InterceptorChain) Start() error {
3✔
237
        var err error
3✔
238
        r.started.Do(func() {
6✔
239
                err = r.ntfnServer.Start()
3✔
240
        })
3✔
241

242
        return err
3✔
243
}
244

245
// Stop stops the InterceptorChain and its internal state subscription server.
246
func (r *InterceptorChain) Stop() error {
3✔
247
        var err error
3✔
248
        r.stopped.Do(func() {
6✔
249
                close(r.quit)
3✔
250
                err = r.ntfnServer.Stop()
3✔
251
        })
3✔
252

253
        return err
3✔
254
}
255

256
// SetWalletNotCreated moves the RPC state from either waitingToStart to
257
// walletNotCreated.
258
func (r *InterceptorChain) SetWalletNotCreated() {
3✔
259
        r.Lock()
3✔
260
        defer r.Unlock()
3✔
261

3✔
262
        r.state = walletNotCreated
3✔
263
        _ = r.ntfnServer.SendUpdate(r.state)
3✔
264
}
3✔
265

266
// SetWalletLocked moves the RPC state from either walletNotCreated to
267
// walletLocked.
268
func (r *InterceptorChain) SetWalletLocked() {
3✔
269
        r.Lock()
3✔
270
        defer r.Unlock()
3✔
271

3✔
272
        r.state = walletLocked
3✔
273
        _ = r.ntfnServer.SendUpdate(r.state)
3✔
274
}
3✔
275

276
// SetWalletUnlocked moves the RPC state from either walletNotCreated or
277
// walletLocked to walletUnlocked.
278
func (r *InterceptorChain) SetWalletUnlocked() {
3✔
279
        r.Lock()
3✔
280
        defer r.Unlock()
3✔
281

3✔
282
        r.state = walletUnlocked
3✔
283
        _ = r.ntfnServer.SendUpdate(r.state)
3✔
284
}
3✔
285

286
// SetAllowRemoteSigner moves the RPC state from walletUnlocked to
287
// waitRemoteSigner.
288
func (r *InterceptorChain) SetAllowRemoteSigner() {
3✔
289
        r.Lock()
3✔
290
        defer r.Unlock()
3✔
291

3✔
292
        r.state = allowRemoteSigner
3✔
293
        _ = r.ntfnServer.SendUpdate(r.state)
3✔
294
}
3✔
295

296
// SetRPCActive moves the RPC state from either walletUnlocked or
297
// waitRemoteSigner to rpcActive.
298
func (r *InterceptorChain) SetRPCActive() {
3✔
299
        r.Lock()
3✔
300
        defer r.Unlock()
3✔
301

3✔
302
        r.state = rpcActive
3✔
303
        _ = r.ntfnServer.SendUpdate(r.state)
3✔
304
}
3✔
305

306
// SetServerActive moves the RPC state from rpcActive to serverActive.
307
func (r *InterceptorChain) SetServerActive() {
3✔
308
        r.Lock()
3✔
309
        defer r.Unlock()
3✔
310

3✔
311
        r.state = serverActive
3✔
312
        _ = r.ntfnServer.SendUpdate(r.state)
3✔
313
}
3✔
314

315
// rpcStateToWalletState converts rpcState to lnrpc.WalletState. Returns
316
// WAITING_TO_START and an error on conversion error.
317
func rpcStateToWalletState(state rpcState) (lnrpc.WalletState, error) {
3✔
318
        const defaultState = lnrpc.WalletState_WAITING_TO_START
3✔
319
        var walletState lnrpc.WalletState
3✔
320

3✔
321
        switch state {
3✔
322
        case waitingToStart:
3✔
323
                walletState = lnrpc.WalletState_WAITING_TO_START
3✔
324
        case walletNotCreated:
3✔
325
                walletState = lnrpc.WalletState_NON_EXISTING
3✔
326
        case walletLocked:
3✔
327
                walletState = lnrpc.WalletState_LOCKED
3✔
328
        case walletUnlocked:
3✔
329
                walletState = lnrpc.WalletState_UNLOCKED
3✔
330
        case allowRemoteSigner:
3✔
331
                walletState = lnrpc.WalletState_ALLOW_REMOTE_SIGNER
3✔
332
        case rpcActive:
3✔
333
                walletState = lnrpc.WalletState_RPC_ACTIVE
3✔
334
        case serverActive:
3✔
335
                walletState = lnrpc.WalletState_SERVER_ACTIVE
3✔
336

337
        default:
×
338
                return defaultState, fmt.Errorf("unknown wallet state %v", state)
×
339
        }
340

341
        return walletState, nil
3✔
342
}
343

344
// SubscribeState subscribes to the state of the wallet. The current wallet
345
// state will always be delivered immediately.
346
//
347
// NOTE: Part of the StateService interface.
348
func (r *InterceptorChain) SubscribeState(_ *lnrpc.SubscribeStateRequest,
349
        stream lnrpc.State_SubscribeStateServer) error {
3✔
350

3✔
351
        sendStateUpdate := func(state rpcState) error {
6✔
352
                walletState, err := rpcStateToWalletState(state)
3✔
353
                if err != nil {
3✔
354
                        return err
×
355
                }
×
356

357
                return stream.Send(&lnrpc.SubscribeStateResponse{
3✔
358
                        State: walletState,
3✔
359
                })
3✔
360
        }
361

362
        // Subscribe to state updates.
363
        client, err := r.ntfnServer.Subscribe()
3✔
364
        if err != nil {
3✔
365
                return err
×
366
        }
×
367
        defer client.Cancel()
3✔
368

3✔
369
        // Always start by sending the current state.
3✔
370
        r.RLock()
3✔
371
        state := r.state
3✔
372
        r.RUnlock()
3✔
373

3✔
374
        if err := sendStateUpdate(state); err != nil {
3✔
375
                return err
×
376
        }
×
377

378
        for {
6✔
379
                select {
3✔
380
                case e := <-client.Updates():
3✔
381
                        newState := e.(rpcState)
3✔
382

3✔
383
                        // Ignore already sent state.
3✔
384
                        if newState == state {
3✔
385
                                continue
×
386
                        }
387

388
                        state = newState
3✔
389
                        err := sendStateUpdate(state)
3✔
390
                        if err != nil {
5✔
391
                                return err
2✔
392
                        }
2✔
393

394
                // The response stream's context for whatever reason has been
395
                // closed. If context is closed by an exceeded deadline we will
396
                // return an error.
397
                case <-stream.Context().Done():
3✔
398
                        if errors.Is(stream.Context().Err(), context.Canceled) {
6✔
399
                                return nil
3✔
400
                        }
3✔
401
                        return stream.Context().Err()
×
402

403
                case <-r.quit:
×
404
                        return fmt.Errorf("server exiting")
×
405
                }
406
        }
407
}
408

409
// GetState returns the current wallet state.
410
func (r *InterceptorChain) GetState(_ context.Context,
411
        _ *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) {
3✔
412

3✔
413
        r.RLock()
3✔
414
        state := r.state
3✔
415
        r.RUnlock()
3✔
416

3✔
417
        walletState, err := rpcStateToWalletState(state)
3✔
418
        if err != nil {
3✔
419
                return nil, err
×
420
        }
×
421

422
        return &lnrpc.GetStateResponse{
3✔
423
                State: walletState,
3✔
424
        }, nil
3✔
425
}
426

427
// AddMacaroonService adds a macaroon service to the interceptor. After this is
428
// done every RPC call made will have to pass a valid macaroon to be accepted.
429
func (r *InterceptorChain) AddMacaroonService(svc *macaroons.Service) {
3✔
430
        r.Lock()
3✔
431
        defer r.Unlock()
3✔
432

3✔
433
        r.svc = svc
3✔
434
}
3✔
435

436
// MacaroonService returns the currently registered macaroon service. This might
437
// be nil if none was registered (yet).
438
func (r *InterceptorChain) MacaroonService() *macaroons.Service {
3✔
439
        r.RLock()
3✔
440
        defer r.RUnlock()
3✔
441

3✔
442
        return r.svc
3✔
443
}
3✔
444

445
// AddPermission adds a new macaroon rule for the given method.
446
func (r *InterceptorChain) AddPermission(method string, ops []bakery.Op) error {
3✔
447
        r.Lock()
3✔
448
        defer r.Unlock()
3✔
449

3✔
450
        if _, ok := r.permissionMap[method]; ok {
3✔
451
                return fmt.Errorf("detected duplicate macaroon constraints "+
×
452
                        "for path: %v", method)
×
453
        }
×
454

455
        r.permissionMap[method] = ops
3✔
456
        return nil
3✔
457
}
458

459
// Permissions returns the current set of macaroon permissions.
460
func (r *InterceptorChain) Permissions() map[string][]bakery.Op {
3✔
461
        r.RLock()
3✔
462
        defer r.RUnlock()
3✔
463

3✔
464
        // Make a copy under the read lock to avoid races.
3✔
465
        c := make(map[string][]bakery.Op)
3✔
466
        for k, v := range r.permissionMap {
6✔
467
                s := make([]bakery.Op, len(v))
3✔
468
                copy(s, v)
3✔
469
                c[k] = s
3✔
470
        }
3✔
471

472
        return c
3✔
473
}
474

475
// RegisterMiddleware registers a new middleware that will handle request/
476
// response interception for all RPC messages that are initiated with a custom
477
// macaroon caveat. The name of the custom caveat a middleware is handling is
478
// also its unique identifier. Only one middleware can be registered for each
479
// custom caveat.
480
func (r *InterceptorChain) RegisterMiddleware(mw *MiddlewareHandler) error {
3✔
481
        r.Lock()
3✔
482
        defer r.Unlock()
3✔
483

3✔
484
        // The name of the middleware is the unique identifier.
3✔
485
        _, ok := r.registeredMiddlewareNames[mw.middlewareName]
3✔
486
        if ok {
3✔
487
                return fmt.Errorf("a middleware with the name '%s' is already "+
×
488
                        "registered", mw.middlewareName)
×
489
        }
×
490

491
        // For now, we only want one middleware per custom caveat name. If we
492
        // allowed multiple middlewares handling the same caveat there would be
493
        // a need for extra call chaining logic, and they could overwrite each
494
        // other's responses.
495
        for _, middleware := range r.registeredMiddleware {
3✔
496
                if middleware.customCaveatName == mw.customCaveatName {
×
497
                        return fmt.Errorf("a middleware is already registered "+
×
498
                                "for the custom caveat name '%s': %v",
×
499
                                mw.customCaveatName, middleware.middlewareName)
×
500
                }
×
501
        }
502

503
        r.registeredMiddleware = append(r.registeredMiddleware, mw)
3✔
504
        index := len(r.registeredMiddleware) - 1
3✔
505
        r.registeredMiddlewareNames[mw.middlewareName] = index
3✔
506

3✔
507
        return nil
3✔
508
}
509

510
// RemoveMiddleware removes the middleware that handles the given custom caveat
511
// name.
512
func (r *InterceptorChain) RemoveMiddleware(middlewareName string) {
3✔
513
        r.Lock()
3✔
514
        defer r.Unlock()
3✔
515

3✔
516
        log.Debugf("Removing middleware %s", middlewareName)
3✔
517

3✔
518
        index, ok := r.registeredMiddlewareNames[middlewareName]
3✔
519
        if !ok {
3✔
520
                return
×
521
        }
×
522
        delete(r.registeredMiddlewareNames, middlewareName)
3✔
523

3✔
524
        r.registeredMiddleware = append(
3✔
525
                r.registeredMiddleware[:index],
3✔
526
                r.registeredMiddleware[index+1:]...,
3✔
527
        )
3✔
528

3✔
529
        // Re-initialise the middleware look-up map with the updated indexes.
3✔
530
        r.registeredMiddlewareNames = make(map[string]int)
3✔
531
        for i, mw := range r.registeredMiddleware {
3✔
532
                r.registeredMiddlewareNames[mw.middlewareName] = i
×
533
        }
×
534
}
535

536
// CustomCaveatSupported makes sure a middleware that handles the given custom
537
// caveat name is registered. If none is, an error is returned, signalling to
538
// the macaroon bakery and its validator to reject macaroons that have a custom
539
// caveat with that name.
540
//
541
// NOTE: This method is part of the macaroons.CustomCaveatAcceptor interface.
542
func (r *InterceptorChain) CustomCaveatSupported(customCaveatName string) error {
3✔
543
        r.RLock()
3✔
544
        defer r.RUnlock()
3✔
545

3✔
546
        // We only accept requests with a custom caveat if we also have a
3✔
547
        // middleware registered that handles that custom caveat. That is
3✔
548
        // crucial for security! Otherwise a request with an encumbered (=has
3✔
549
        // restricted permissions based upon the custom caveat condition)
3✔
550
        // macaroon would not be validated against the limitations that the
3✔
551
        // custom caveat implicate. Since the map is keyed by the _name_ of the
3✔
552
        // middleware, we need to loop through all of them to see if one has
3✔
553
        // the given custom macaroon caveat name.
3✔
554
        for _, middleware := range r.registeredMiddleware {
6✔
555
                if middleware.customCaveatName == customCaveatName {
6✔
556
                        return nil
3✔
557
                }
3✔
558
        }
559

560
        return fmt.Errorf("cannot accept macaroon with custom caveat '%s', "+
3✔
561
                "no middleware registered to handle it", customCaveatName)
3✔
562
}
563

564
// CreateServerOpts creates the GRPC server options that can be added to a GRPC
565
// server in order to add this InterceptorChain.
566
func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption {
3✔
567
        var unaryInterceptors []grpc.UnaryServerInterceptor
3✔
568
        var strmInterceptors []grpc.StreamServerInterceptor
3✔
569

3✔
570
        // The first interceptors we'll add to the chain is our logging
3✔
571
        // interceptors, so we can automatically log all errors that happen
3✔
572
        // during RPC calls.
3✔
573
        unaryInterceptors = append(
3✔
574
                unaryInterceptors, errorLogUnaryServerInterceptor(r.rpcsLog),
3✔
575
        )
3✔
576
        strmInterceptors = append(
3✔
577
                strmInterceptors, errorLogStreamServerInterceptor(r.rpcsLog),
3✔
578
        )
3✔
579

3✔
580
        // Next we'll add our RPC state check interceptors, that will check
3✔
581
        // whether the attempted call is allowed in the current state.
3✔
582
        unaryInterceptors = append(
3✔
583
                unaryInterceptors, r.rpcStateUnaryServerInterceptor(),
3✔
584
        )
3✔
585
        strmInterceptors = append(
3✔
586
                strmInterceptors, r.rpcStateStreamServerInterceptor(),
3✔
587
        )
3✔
588

3✔
589
        // We'll add the macaroon interceptors. If macaroons aren't disabled,
3✔
590
        // then these interceptors will enforce macaroon authentication.
3✔
591
        unaryInterceptors = append(
3✔
592
                unaryInterceptors, r.MacaroonUnaryServerInterceptor(),
3✔
593
        )
3✔
594
        strmInterceptors = append(
3✔
595
                strmInterceptors, r.MacaroonStreamServerInterceptor(),
3✔
596
        )
3✔
597

3✔
598
        // Next, we'll add the interceptors for our custom macaroon caveat based
3✔
599
        // middleware.
3✔
600
        unaryInterceptors = append(
3✔
601
                unaryInterceptors, r.middlewareUnaryServerInterceptor(),
3✔
602
        )
3✔
603
        strmInterceptors = append(
3✔
604
                strmInterceptors, r.middlewareStreamServerInterceptor(),
3✔
605
        )
3✔
606

3✔
607
        // Get interceptors for Prometheus to gather gRPC performance metrics.
3✔
608
        // If monitoring is disabled, GetPromInterceptors() will return empty
3✔
609
        // slices.
3✔
610
        promUnaryInterceptors, promStrmInterceptors :=
3✔
611
                monitoring.GetPromInterceptors()
3✔
612

3✔
613
        // Concatenate the slices of unary and stream interceptors respectively.
3✔
614
        unaryInterceptors = append(unaryInterceptors, promUnaryInterceptors...)
3✔
615
        strmInterceptors = append(strmInterceptors, promStrmInterceptors...)
3✔
616

3✔
617
        // Create server options from the interceptors we just set up.
3✔
618
        chainedUnary := grpc_middleware.WithUnaryServerChain(
3✔
619
                unaryInterceptors...,
3✔
620
        )
3✔
621
        chainedStream := grpc_middleware.WithStreamServerChain(
3✔
622
                strmInterceptors...,
3✔
623
        )
3✔
624
        serverOpts := []grpc.ServerOption{chainedUnary, chainedStream}
3✔
625

3✔
626
        return serverOpts
3✔
627
}
3✔
628

629
// errorLogUnaryServerInterceptor is a simple UnaryServerInterceptor that will
630
// automatically log any errors that occur when serving a client's unary
631
// request.
632
func errorLogUnaryServerInterceptor(logger btclog.Logger) grpc.UnaryServerInterceptor {
3✔
633
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
3✔
634
                handler grpc.UnaryHandler) (interface{}, error) {
6✔
635

3✔
636
                resp, err := handler(ctx, req)
3✔
637
                if err != nil {
6✔
638
                        // TODO(roasbeef): also log request details?
3✔
639
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
3✔
640
                }
3✔
641

642
                return resp, err
3✔
643
        }
644
}
645

646
// errorLogStreamServerInterceptor is a simple StreamServerInterceptor that
647
// will log any errors that occur while processing a client or server streaming
648
// RPC.
649
func errorLogStreamServerInterceptor(logger btclog.Logger) grpc.StreamServerInterceptor {
3✔
650
        return func(srv interface{}, ss grpc.ServerStream,
3✔
651
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
6✔
652

3✔
653
                err := handler(srv, ss)
3✔
654
                if err != nil {
6✔
655
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
3✔
656
                }
3✔
657

658
                return err
3✔
659
        }
660
}
661

662
// checkMacaroon validates that the context contains the macaroon needed to
663
// invoke the given RPC method.
664
func (r *InterceptorChain) checkMacaroon(ctx context.Context,
665
        fullMethod string) error {
3✔
666

3✔
667
        // If noMacaroons is set, we'll always allow the call.
3✔
668
        if r.noMacaroons {
3✔
669
                return nil
×
670
        }
×
671

672
        // Check whether the method is whitelisted, if so we'll allow it
673
        // regardless of macaroons.
674
        _, ok := macaroonWhitelist[fullMethod]
3✔
675
        if ok {
6✔
676
                return nil
3✔
677
        }
3✔
678

679
        r.RLock()
3✔
680
        svc := r.svc
3✔
681
        r.RUnlock()
3✔
682

3✔
683
        // If the macaroon service is not yet active, we cannot allow
3✔
684
        // the call.
3✔
685
        if svc == nil {
3✔
686
                return fmt.Errorf("unable to determine macaroon permissions")
×
687
        }
×
688

689
        r.RLock()
3✔
690
        uriPermissions, ok := r.permissionMap[fullMethod]
3✔
691
        r.RUnlock()
3✔
692
        if !ok {
3✔
693
                return fmt.Errorf("%s: unknown permissions required for method",
×
694
                        fullMethod)
×
695
        }
×
696

697
        // Find out if there is an external validator registered for
698
        // this method. Fall back to the internal one if there isn't.
699
        validator, ok := svc.ExternalValidators[fullMethod]
3✔
700
        if !ok {
6✔
701
                validator = svc
3✔
702
        }
3✔
703

704
        // Now that we know what validator to use, let it do its work.
705
        return validator.ValidateMacaroon(ctx, uriPermissions, fullMethod)
3✔
706
}
707

708
// MacaroonUnaryServerInterceptor is a GRPC interceptor that checks whether the
709
// request is authorized by the included macaroons.
710
func (r *InterceptorChain) MacaroonUnaryServerInterceptor() grpc.UnaryServerInterceptor {
3✔
711
        return func(ctx context.Context, req interface{},
3✔
712
                info *grpc.UnaryServerInfo,
3✔
713
                handler grpc.UnaryHandler) (interface{}, error) {
6✔
714

3✔
715
                // Check macaroons.
3✔
716
                if err := r.checkMacaroon(ctx, info.FullMethod); err != nil {
6✔
717
                        return nil, err
3✔
718
                }
3✔
719

720
                return handler(ctx, req)
3✔
721
        }
722
}
723

724
// MacaroonStreamServerInterceptor is a GRPC interceptor that checks whether
725
// the request is authorized by the included macaroons.
726
func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerInterceptor {
3✔
727
        return func(srv interface{}, ss grpc.ServerStream,
3✔
728
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
6✔
729

3✔
730
                // Check macaroons.
3✔
731
                err := r.checkMacaroon(ss.Context(), info.FullMethod)
3✔
732
                if err != nil {
6✔
733
                        return err
3✔
734
                }
3✔
735

736
                return handler(srv, ss)
3✔
737
        }
738
}
739

740
// checkRPCState checks whether a call to the given server is allowed in the
741
// current RPC state.
742
func (r *InterceptorChain) checkRPCState(srv interface{},
743
        fullMethod string) error {
3✔
744

3✔
745
        // The StateService is being accessed, we allow the call regardless of
3✔
746
        // the current state.
3✔
747
        _, ok := srv.(lnrpc.StateServer)
3✔
748
        if ok {
6✔
749
                return nil
3✔
750
        }
3✔
751

752
        r.RLock()
3✔
753
        state := r.state
3✔
754
        r.RUnlock()
3✔
755

3✔
756
        switch state {
3✔
757
        // Do not accept any RPC calls (unless to the state service) until LND
758
        // has not started.
759
        case waitingToStart:
×
760
                return ErrWaitingToStart
×
761

762
        // If the wallet does not exists, only calls to the WalletUnlocker are
763
        // accepted.
764
        case walletNotCreated:
3✔
765
                _, ok := srv.(lnrpc.WalletUnlockerServer)
3✔
766
                if !ok {
3✔
767
                        return ErrNoWallet
×
768
                }
×
769

770
        // If the wallet is locked, only calls to the WalletUnlocker are
771
        // accepted.
772
        case walletLocked:
3✔
773
                _, ok := srv.(lnrpc.WalletUnlockerServer)
3✔
774
                if !ok {
3✔
775
                        return ErrWalletLocked
×
776
                }
×
777

778
        // If the wallet is unlocked, but the RPC not yet active, we reject.
779
        case walletUnlocked:
1✔
780
                _, ok := srv.(lnrpc.WalletUnlockerServer)
1✔
781
                if ok {
1✔
782
                        return ErrWalletUnlocked
×
783
                }
×
784

785
                return ErrRPCStarting
1✔
786

787
        // If lnd is waiting for the remote signer to connect, we only allow
788
        // calls to the remote signer.
789
        case allowRemoteSigner:
3✔
790
                _, ok := srv.(lnrpc.WalletUnlockerServer)
3✔
791
                if ok {
3✔
NEW
792
                        return ErrWalletUnlocked
×
NEW
793
                }
×
794

795
                // As we only allow calls to connect the remote signer until the
796
                // full rpc server is active, we check whether the method is
797
                // whitelisted or not.
798
                _, ok = allowRemoteSignerWhitelist[fullMethod]
3✔
799
                if !ok {
3✔
NEW
800
                        return ErrAwaitingRemoteSigner
×
NEW
801
                }
×
802

803
        // If the RPC server or lnd server is active, we allow calls to any
804
        // service except the WalletUnlocker.
805
        case rpcActive, serverActive:
3✔
806
                _, ok := srv.(lnrpc.WalletUnlockerServer)
3✔
807
                if ok {
3✔
808
                        return ErrWalletUnlocked
×
809
                }
×
810

811
        default:
×
812
                return fmt.Errorf("unknown RPC state: %v", state)
×
813
        }
814

815
        return nil
3✔
816
}
817

818
// rpcStateUnaryServerInterceptor is a GRPC interceptor that checks whether
819
// calls to the given gGRPC server is allowed in the current rpc state.
820
func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInterceptor {
3✔
821
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
3✔
822
                handler grpc.UnaryHandler) (interface{}, error) {
6✔
823

3✔
824
                method := info.FullMethod
3✔
825
                r.rpcsLog.Debugf("[%v] requested", method)
3✔
826

3✔
827
                if err := r.checkRPCState(info.Server, method); err != nil {
3✔
828
                        return nil, err
×
829
                }
×
830

831
                return handler(ctx, req)
3✔
832
        }
833
}
834

835
// rpcStateStreamServerInterceptor is a GRPC interceptor that checks whether
836
// calls to the given gGRPC server is allowed in the current rpc state.
837
func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerInterceptor {
3✔
838
        return func(srv interface{}, ss grpc.ServerStream,
3✔
839
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
6✔
840

3✔
841
                r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
3✔
842

3✔
843
                if err := r.checkRPCState(srv, info.FullMethod); err != nil {
4✔
844
                        return err
1✔
845
                }
1✔
846

847
                return handler(srv, ss)
3✔
848
        }
849
}
850

851
// middlewareUnaryServerInterceptor is a unary gRPC interceptor that intercepts
852
// all requests and responses that are sent with a macaroon containing a custom
853
// caveat condition that is handled by registered middleware.
854
func (r *InterceptorChain) middlewareUnaryServerInterceptor() grpc.UnaryServerInterceptor {
3✔
855
        return func(ctx context.Context,
3✔
856
                req interface{}, info *grpc.UnaryServerInfo,
3✔
857
                handler grpc.UnaryHandler) (interface{}, error) {
6✔
858

3✔
859
                // Make sure we don't allow any requests through if one of the
3✔
860
                // mandatory middlewares is missing.
3✔
861
                fullMethod := info.FullMethod
3✔
862
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
3✔
863
                        return nil, err
×
864
                }
×
865

866
                // If there is no middleware registered, we don't need to
867
                // intercept anything.
868
                if !r.middlewareRegistered() {
6✔
869
                        return handler(ctx, req)
3✔
870
                }
3✔
871

872
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
3✔
873
                req, err := r.interceptMessage(
3✔
874
                        ctx, TypeRequest, requestID, false, info.FullMethod,
3✔
875
                        req,
3✔
876
                )
3✔
877
                if err != nil {
3✔
878
                        return nil, err
×
879
                }
×
880

881
                // Call the handler, which executes the request against lnd.
882
                lndResp, lndErr := handler(ctx, req)
3✔
883
                if lndErr != nil {
6✔
884
                        // The call to lnd ended in an error and not a normal
3✔
885
                        // proto message response. Send the error to the
3✔
886
                        // interceptor as well to inform about the abnormal
3✔
887
                        // termination of the stream and to give the option to
3✔
888
                        // replace the error message with a custom one.
3✔
889
                        replacedErr, err := r.interceptMessage(
3✔
890
                                ctx, TypeResponse, requestID, false,
3✔
891
                                info.FullMethod, lndErr,
3✔
892
                        )
3✔
893
                        if err != nil {
3✔
894
                                return nil, err
×
895
                        }
×
896
                        return lndResp, replacedErr.(error)
3✔
897
                }
898

899
                return r.interceptMessage(
3✔
900
                        ctx, TypeResponse, requestID, false, info.FullMethod,
3✔
901
                        lndResp,
3✔
902
                )
3✔
903
        }
904
}
905

906
// middlewareStreamServerInterceptor is a streaming gRPC interceptor that
907
// intercepts all requests and responses that are sent with a macaroon
908
// containing a custom caveat condition that is handled by registered
909
// middleware.
910
func (r *InterceptorChain) middlewareStreamServerInterceptor() grpc.StreamServerInterceptor {
3✔
911
        return func(srv interface{},
3✔
912
                ss grpc.ServerStream, info *grpc.StreamServerInfo,
3✔
913
                handler grpc.StreamHandler) error {
6✔
914

3✔
915
                // Don't intercept the interceptor itself which is a streaming
3✔
916
                // RPC too!
3✔
917
                fullMethod := info.FullMethod
3✔
918
                if fullMethod == lnrpc.RegisterRPCMiddlewareURI {
6✔
919
                        return handler(srv, ss)
3✔
920
                }
3✔
921

922
                // Make sure we don't allow any requests through if one of the
923
                // mandatory middlewares is missing. We add this check here to
924
                // make sure the middleware registration itself can still be
925
                // called.
926
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
3✔
927
                        return err
×
928
                }
×
929

930
                // If there is no middleware registered, we don't need to
931
                // intercept anything.
932
                if !r.middlewareRegistered() {
6✔
933
                        return handler(srv, ss)
3✔
934
                }
3✔
935

936
                // To give the middleware a chance to accept or reject the
937
                // establishment of the stream itself (and not only when the
938
                // first message is sent on the stream), we send an intercept
939
                // request for the stream auth now:
940
                msg, err := NewStreamAuthInterceptionRequest(
3✔
941
                        ss.Context(), info.FullMethod,
3✔
942
                )
3✔
943
                if err != nil {
3✔
944
                        return err
×
945
                }
×
946

947
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
3✔
948
                err = r.acceptStream(requestID, msg)
3✔
949
                if err != nil {
3✔
950
                        return err
×
951
                }
×
952

953
                wrappedSS := &serverStreamWrapper{
3✔
954
                        ServerStream: ss,
3✔
955
                        requestID:    requestID,
3✔
956
                        fullMethod:   info.FullMethod,
3✔
957
                        interceptor:  r,
3✔
958
                }
3✔
959

3✔
960
                // Call the stream handler, which will block as long as the
3✔
961
                // stream is alive.
3✔
962
                lndErr := handler(srv, wrappedSS)
3✔
963
                if lndErr != nil {
6✔
964
                        // This is an error being returned from lnd. Send it to
3✔
965
                        // the interceptor as well to inform about the abnormal
3✔
966
                        // termination of the stream and to give the option to
3✔
967
                        // replace the error message with a custom one.
3✔
968
                        replacedErr, err := r.interceptMessage(
3✔
969
                                ss.Context(), TypeResponse, requestID,
3✔
970
                                true, info.FullMethod, lndErr,
3✔
971
                        )
3✔
972
                        if err != nil {
3✔
973
                                return err
×
974
                        }
×
975

976
                        return replacedErr.(error)
3✔
977
                }
978

979
                // Normal/successful termination of the stream.
980
                return nil
3✔
981
        }
982
}
983

984
// checkMandatoryMiddleware makes sure that each of the middlewares declared as
985
// mandatory is currently registered.
986
func (r *InterceptorChain) checkMandatoryMiddleware(fullMethod string) error {
3✔
987
        r.RLock()
3✔
988
        defer r.RUnlock()
3✔
989

3✔
990
        // Allow calls that are whitelisted for macaroons as well, otherwise we
3✔
991
        // get into impossible situations where the wallet is locked but the
3✔
992
        // unlock call is denied because the middleware isn't registered. But
3✔
993
        // the middleware cannot register itself because the wallet is locked.
3✔
994
        if _, ok := macaroonWhitelist[fullMethod]; ok {
6✔
995
                return nil
3✔
996
        }
3✔
997

998
        // Not a white listed call so make sure every mandatory middleware is
999
        // currently connected to lnd.
1000
        for _, name := range r.mandatoryMiddleware {
3✔
1001
                if _, ok := r.registeredMiddlewareNames[name]; !ok {
×
1002
                        return fmt.Errorf("mandatory middleware '%s' is "+
×
1003
                                "currently not registered, not allowing any "+
×
1004
                                "RPC calls", name)
×
1005
                }
×
1006
        }
1007

1008
        return nil
3✔
1009
}
1010

1011
// middlewareRegistered returns true if there is at least one middleware
1012
// currently registered.
1013
func (r *InterceptorChain) middlewareRegistered() bool {
3✔
1014
        r.RLock()
3✔
1015
        defer r.RUnlock()
3✔
1016

3✔
1017
        return len(r.registeredMiddleware) > 0
3✔
1018
}
3✔
1019

1020
// acceptStream sends an intercept request to all middlewares that have
1021
// registered for it. This means either a middleware has requested read-only
1022
// access or the request actually has a macaroon with a caveat the middleware
1023
// registered for.
1024
func (r *InterceptorChain) acceptStream(requestID uint64,
1025
        msg *InterceptionRequest) error {
3✔
1026

3✔
1027
        r.RLock()
3✔
1028
        defer r.RUnlock()
3✔
1029

3✔
1030
        for _, middleware := range r.registeredMiddleware {
6✔
1031
                // If there is a custom caveat in the macaroon, make sure the
3✔
1032
                // middleware registered for it. Or if a middleware registered
3✔
1033
                // for read-only mode, it also gets the request.
3✔
1034
                hasCustomCaveat := macaroons.HasCustomCaveat(
3✔
1035
                        msg.Macaroon, middleware.customCaveatName,
3✔
1036
                )
3✔
1037
                if !hasCustomCaveat && !middleware.readOnly {
3✔
1038
                        continue
×
1039
                }
1040

1041
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
3✔
1042
                        msg.Macaroon, middleware.customCaveatName,
3✔
1043
                )
3✔
1044

3✔
1045
                resp, err := middleware.intercept(requestID, msg)
3✔
1046

3✔
1047
                // Error during interception itself.
3✔
1048
                if err != nil {
3✔
1049
                        return err
×
1050
                }
×
1051

1052
                // Error returned from middleware client.
1053
                if resp.err != nil {
3✔
1054
                        return resp.err
×
1055
                }
×
1056
        }
1057

1058
        return nil
3✔
1059
}
1060

1061
// interceptMessage sends out an intercept request for an RPC response. Since
1062
// middleware that hasn't registered for the read-only mode has the option to
1063
// overwrite/replace the message, this needs to be handled differently than the
1064
// auth path above.
1065
func (r *InterceptorChain) interceptMessage(ctx context.Context,
1066
        interceptType InterceptType, requestID uint64, isStream bool,
1067
        fullMethod string, m interface{}) (interface{}, error) {
3✔
1068

3✔
1069
        r.RLock()
3✔
1070
        defer r.RUnlock()
3✔
1071

3✔
1072
        currentMessage := m
3✔
1073
        for _, middleware := range r.registeredMiddleware {
6✔
1074
                msg, err := NewMessageInterceptionRequest(
3✔
1075
                        ctx, interceptType, isStream, fullMethod,
3✔
1076
                        currentMessage,
3✔
1077
                )
3✔
1078
                if err != nil {
3✔
1079
                        return nil, err
×
1080
                }
×
1081

1082
                // If there is a custom caveat in the macaroon, make sure the
1083
                // middleware registered for it. Or if a middleware registered
1084
                // for read-only mode, it also gets the request.
1085
                hasCustomCaveat := macaroons.HasCustomCaveat(
3✔
1086
                        msg.Macaroon, middleware.customCaveatName,
3✔
1087
                )
3✔
1088
                if !hasCustomCaveat && !middleware.readOnly {
6✔
1089
                        continue
3✔
1090
                }
1091

1092
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
3✔
1093
                        msg.Macaroon, middleware.customCaveatName,
3✔
1094
                )
3✔
1095

3✔
1096
                resp, err := middleware.intercept(requestID, msg)
3✔
1097

3✔
1098
                // Error during interception itself.
3✔
1099
                if err != nil {
3✔
1100
                        return nil, err
×
1101
                }
×
1102

1103
                // Error returned from middleware client.
1104
                if resp.err != nil {
3✔
1105
                        return nil, resp.err
×
1106
                }
×
1107

1108
                // The message was replaced, make sure the next middleware in
1109
                // line receives the updated message.
1110
                if !middleware.readOnly && resp.replace {
6✔
1111
                        currentMessage = resp.replacement
3✔
1112
                }
3✔
1113
        }
1114

1115
        return currentMessage, nil
3✔
1116
}
1117

1118
// serverStreamWrapper is a struct that wraps a server stream in a way that all
1119
// requests and responses can be intercepted individually.
1120
type serverStreamWrapper struct {
1121
        // ServerStream is the stream that's being wrapped.
1122
        grpc.ServerStream
1123

1124
        requestID uint64
1125

1126
        fullMethod string
1127

1128
        interceptor *InterceptorChain
1129
}
1130

1131
// SendMsg is called when lnd sends a message to the client. This is wrapped to
1132
// intercept streaming RPC responses.
1133
func (w *serverStreamWrapper) SendMsg(m interface{}) error {
3✔
1134
        newMsg, err := w.interceptor.interceptMessage(
3✔
1135
                w.ServerStream.Context(), TypeResponse, w.requestID, true,
3✔
1136
                w.fullMethod, m,
3✔
1137
        )
3✔
1138
        if err != nil {
3✔
1139
                return err
×
1140
        }
×
1141

1142
        return w.ServerStream.SendMsg(newMsg)
3✔
1143
}
1144

1145
// RecvMsg is called when lnd wants to receive a message from the client. This
1146
// is wrapped to intercept streaming RPC requests.
1147
func (w *serverStreamWrapper) RecvMsg(m interface{}) error {
3✔
1148
        err := w.ServerStream.RecvMsg(m)
3✔
1149
        if err != nil {
3✔
1150
                return err
×
1151
        }
×
1152

1153
        req, err := w.interceptor.interceptMessage(
3✔
1154
                w.ServerStream.Context(), TypeRequest, w.requestID, true,
3✔
1155
                w.fullMethod, m,
3✔
1156
        )
3✔
1157
        if err != nil {
3✔
1158
                return err
×
1159
        }
×
1160

1161
        return replaceProtoMsg(m, req)
3✔
1162
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc