• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13974489001

20 Mar 2025 04:32PM UTC coverage: 56.292% (-2.9%) from 59.168%
13974489001

Pull #8754

github

web-flow
Merge aed149e6b into ea050d06f
Pull Request #8754: Add `Outbound` Remote Signer implementation

594 of 1713 new or added lines in 26 files covered. (34.68%)

23052 existing lines in 272 files now uncovered.

105921 of 188165 relevant lines covered (56.29%)

23796.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/rpcperms/interceptor.go
1
package rpcperms
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btclog/v2"
11
        grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
12
        "github.com/lightningnetwork/lnd/lnrpc"
13
        "github.com/lightningnetwork/lnd/macaroons"
14
        "github.com/lightningnetwork/lnd/monitoring"
15
        "github.com/lightningnetwork/lnd/subscribe"
16
        "google.golang.org/grpc"
17
        "gopkg.in/macaroon-bakery.v2/bakery"
18
)
19

20
// rpcState is an enum that we use to keep track of the current RPC service
21
// state. This will transition as we go from startup to unlocking the wallet,
22
// and finally fully active.
23
type rpcState uint8
24

25
const (
26
        // waitingToStart indicates that we're at the beginning of the startup
27
        // process. In a cluster environment this may mean that we're waiting to
28
        // become the leader in which case RPC calls will be disabled until
29
        // this instance has been elected as leader.
30
        waitingToStart rpcState = iota
31

32
        // walletNotCreated is the starting state if the RPC server is active,
33
        // but the wallet is not yet created. In this state we'll only allow
34
        // calls to the WalletUnlockerService.
35
        walletNotCreated
36

37
        // walletLocked indicates the RPC server is active, but the wallet is
38
        // locked. In this state we'll only allow calls to the
39
        // WalletUnlockerService.
40
        walletLocked
41

42
        // walletUnlocked means that the wallet has been unlocked, but the full
43
        // RPC server is not yet ready.
44
        walletUnlocked
45

46
        // allowRemoteSigner means that the wallet is unlocked, and that we're
47
        // waiting for the remote signer to connect before proceeding. Only
48
        // rpc calls to connect the remote signer are allowed during this state.
49
        allowRemoteSigner
50

51
        // rpcActive means that the RPC server is ready to accept calls.
52
        rpcActive
53

54
        // serverActive means that the lnd server is ready to accept calls.
55
        serverActive
56
)
57

58
var (
59
        // ErrWaitingToStart is returned if LND is still waiting to start,
60
        // possibly blocked until elected as the leader.
61
        ErrWaitingToStart = fmt.Errorf("waiting to start, RPC services not " +
62
                "available")
63

64
        // ErrNoWallet is returned if the wallet does not exist.
65
        ErrNoWallet = fmt.Errorf("wallet not created, create one to enable " +
66
                "full RPC access")
67

68
        // ErrWalletLocked is returned if the wallet is locked and any service
69
        // other than the WalletUnlocker is called.
70
        ErrWalletLocked = fmt.Errorf("wallet locked, unlock it to enable " +
71
                "full RPC access")
72

73
        // ErrWalletUnlocked is returned if the WalletUnlocker service is
74
        // called when the wallet already has been unlocked.
75
        ErrWalletUnlocked = fmt.Errorf("wallet already unlocked, " +
76
                "WalletUnlocker service is no longer available")
77

78
        // ErrAwaitingRemoteSigner is returned if an RPC call is made, other
79
        // than an RPC call to connect a remote signer, while LND is waiting for
80
        // a remote signer to connect.
81
        ErrAwaitingRemoteSigner = fmt.Errorf("waiting for remote signer to " +
82
                "connect before other RPC calls can be accepted")
83

84
        // ErrRPCStarting is returned if the wallet has been unlocked but the
85
        // RPC server is not yet ready to accept calls.
86
        ErrRPCStarting = fmt.Errorf("the RPC server is in the process of " +
87
                "starting up, but not yet ready to accept calls")
88

89
        // macaroonWhitelist defines methods that we don't require macaroons to
90
        // access. We also allow these methods to be called even if not all
91
        // mandatory middlewares are registered yet. If the wallet is locked
92
        // then a middleware cannot register itself, creating an impossible
93
        // situation. Also, a middleware might want to check the state of lnd
94
        // by calling the State service before it registers itself. So we also
95
        // need to exclude those calls from the mandatory middleware check.
96
        macaroonWhitelist = map[string]struct{}{
97
                // We allow all calls to the WalletUnlocker without macaroons.
98
                "/lnrpc.WalletUnlocker/GenSeed":        {},
99
                "/lnrpc.WalletUnlocker/InitWallet":     {},
100
                "/lnrpc.WalletUnlocker/UnlockWallet":   {},
101
                "/lnrpc.WalletUnlocker/ChangePassword": {},
102

103
                // The State service must be available at all times, even
104
                // before we can check macaroons, so we whitelist it.
105
                "/lnrpc.State/SubscribeState": {},
106
                "/lnrpc.State/GetState":       {},
107
        }
108

109
        // allowRemoteSignerWhitelist defines methods that we allow to be called
110
        // when we are waiting for the remote signer to connect, i.e. in the
111
        // allowRemoteSigner state.
112
        allowRemoteSignerWhitelist = map[string]struct{}{
113
                "/walletrpc.WalletKit/SignCoordinatorStreams": {},
114
        }
115
)
116

117
// InterceptorChain is a struct that can be added to the running GRPC server,
118
// intercepting API calls. This is useful for logging, enforcing permissions,
119
// supporting middleware etc. The following diagram shows the order of each
120
// interceptor in the chain and when exactly requests/responses are intercepted
121
// and forwarded to external middleware for approval/modification. Middleware in
122
// general can only intercept gRPC requests/responses that are sent by the
123
// client with a macaroon that contains a custom caveat that is supported by one
124
// of the registered middlewares.
125
//
126
//            |
127
//            | gRPC request from client
128
//            |
129
//        +---v--------------------------------+
130
//        |   InterceptorChain                 |
131
//        +-+----------------------------------+
132
//          | Log Interceptor                  |
133
//          +----------------------------------+
134
//          | RPC State Interceptor            |
135
//          +----------------------------------+
136
//          | Macaroon Interceptor             |
137
//          +----------------------------------+--------> +---------------------+
138
//          | RPC Macaroon Middleware Handler  |<-------- | External Middleware |
139
//          +----------------------------------+          |   - modify request |
140
//          | Prometheus Interceptor           |          +---------------------+
141
//          +-+--------------------------------+
142
//            | validated gRPC request from client
143
//        +---v--------------------------------+
144
//        |   main gRPC server                 |
145
//        +---+--------------------------------+
146
//            |
147
//            | original gRPC request to client
148
//            |
149
//        +---v--------------------------------+--------> +---------------------+
150
//        |   RPC Macaroon Middleware Handler  |<-------- | External Middleware |
151
//        +---+--------------------------------+          |   - modify response |
152
//            |                                           +---------------------+
153
//            | edited gRPC request to client
154
//            v
155
type InterceptorChain struct {
156
        // lastRequestID is the ID of the last gRPC request or stream that was
157
        // intercepted by the middleware interceptor.
158
        //
159
        // NOTE: Must be used atomically!
160
        lastRequestID uint64
161

162
        // Required by the grpc-gateway/v2 library for forward compatibility.
163
        lnrpc.UnimplementedStateServer
164

165
        started sync.Once
166
        stopped sync.Once
167

168
        // state is the current RPC state of our RPC server.
169
        state rpcState
170

171
        // ntfnServer is a subscription server we use to notify clients of the
172
        // State service when the state changes.
173
        ntfnServer *subscribe.Server
174

175
        // noMacaroons should be set true if we don't want to check macaroons.
176
        noMacaroons bool
177

178
        // svc is the macaroon service used to enforce permissions in case
179
        // macaroons are used.
180
        svc *macaroons.Service
181

182
        // permissionMap is the permissions to enforce if macaroons are used.
183
        permissionMap map[string][]bakery.Op
184

185
        // rpcsLog is the logger used to log calls to the RPCs intercepted.
186
        rpcsLog btclog.Logger
187

188
        // registeredMiddleware is a slice of all macaroon permission based RPC
189
        // middleware clients that are currently registered. The
190
        // registeredMiddlewareNames can be used to find the index of a specific
191
        // interceptor within the registeredMiddleware slide using the name of
192
        // the interceptor as the key. The reason for using these two separate
193
        // structures is so that the order in which interceptors are run is
194
        // the same as the order in which they were registered.
195
        registeredMiddleware []*MiddlewareHandler
196

197
        // registeredMiddlewareNames is a map of registered middleware names
198
        // to the index at which they are stored in the registeredMiddleware
199
        // map.
200
        registeredMiddlewareNames map[string]int
201

202
        // mandatoryMiddleware is a list of all middleware that is considered to
203
        // be mandatory. If any of them is not registered then all RPC requests
204
        // (except for the macaroon white listed methods and the middleware
205
        // registration itself) are blocked. This is a security feature to make
206
        // sure that requests can't just go through unobserved/unaudited if a
207
        // middleware crashes.
208
        mandatoryMiddleware []string
209

210
        quit chan struct{}
211
        sync.RWMutex
212
}
213

214
// A compile time check to ensure that InterceptorChain fully implements the
215
// StateServer gRPC service.
216
var _ lnrpc.StateServer = (*InterceptorChain)(nil)
217

218
// NewInterceptorChain creates a new InterceptorChain.
219
func NewInterceptorChain(log btclog.Logger, noMacaroons bool,
UNCOV
220
        mandatoryMiddleware []string) *InterceptorChain {
×
UNCOV
221

×
UNCOV
222
        return &InterceptorChain{
×
UNCOV
223
                state:                     waitingToStart,
×
UNCOV
224
                ntfnServer:                subscribe.NewServer(),
×
UNCOV
225
                noMacaroons:               noMacaroons,
×
UNCOV
226
                permissionMap:             make(map[string][]bakery.Op),
×
UNCOV
227
                rpcsLog:                   log,
×
UNCOV
228
                registeredMiddlewareNames: make(map[string]int),
×
UNCOV
229
                mandatoryMiddleware:       mandatoryMiddleware,
×
UNCOV
230
                quit:                      make(chan struct{}),
×
UNCOV
231
        }
×
UNCOV
232
}
×
233

234
// Start starts the InterceptorChain, which is needed to start the state
235
// subscription server it powers.
UNCOV
236
func (r *InterceptorChain) Start() error {
×
UNCOV
237
        var err error
×
UNCOV
238
        r.started.Do(func() {
×
UNCOV
239
                err = r.ntfnServer.Start()
×
UNCOV
240
        })
×
241

UNCOV
242
        return err
×
243
}
244

245
// Stop stops the InterceptorChain and its internal state subscription server.
UNCOV
246
func (r *InterceptorChain) Stop() error {
×
UNCOV
247
        var err error
×
UNCOV
248
        r.stopped.Do(func() {
×
UNCOV
249
                close(r.quit)
×
UNCOV
250
                err = r.ntfnServer.Stop()
×
UNCOV
251
        })
×
252

UNCOV
253
        return err
×
254
}
255

256
// SetWalletNotCreated moves the RPC state from either waitingToStart to
257
// walletNotCreated.
UNCOV
258
func (r *InterceptorChain) SetWalletNotCreated() {
×
UNCOV
259
        r.Lock()
×
UNCOV
260
        defer r.Unlock()
×
UNCOV
261

×
UNCOV
262
        r.state = walletNotCreated
×
UNCOV
263
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
264
}
×
265

266
// SetWalletLocked moves the RPC state from either walletNotCreated to
267
// walletLocked.
UNCOV
268
func (r *InterceptorChain) SetWalletLocked() {
×
UNCOV
269
        r.Lock()
×
UNCOV
270
        defer r.Unlock()
×
UNCOV
271

×
UNCOV
272
        r.state = walletLocked
×
UNCOV
273
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
274
}
×
275

276
// SetWalletUnlocked moves the RPC state from either walletNotCreated or
277
// walletLocked to walletUnlocked.
UNCOV
278
func (r *InterceptorChain) SetWalletUnlocked() {
×
UNCOV
279
        r.Lock()
×
UNCOV
280
        defer r.Unlock()
×
UNCOV
281

×
UNCOV
282
        r.state = walletUnlocked
×
UNCOV
283
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
284
}
×
285

286
// SetAllowRemoteSigner moves the RPC state from walletUnlocked to
287
// waitRemoteSigner.
NEW
288
func (r *InterceptorChain) SetAllowRemoteSigner() {
×
NEW
289
        r.Lock()
×
NEW
290
        defer r.Unlock()
×
NEW
291

×
NEW
292
        r.state = allowRemoteSigner
×
NEW
293
        _ = r.ntfnServer.SendUpdate(r.state)
×
NEW
294
}
×
295

296
// SetRPCActive moves the RPC state from either walletUnlocked or
297
// waitRemoteSigner to rpcActive.
UNCOV
298
func (r *InterceptorChain) SetRPCActive() {
×
UNCOV
299
        r.Lock()
×
UNCOV
300
        defer r.Unlock()
×
UNCOV
301

×
UNCOV
302
        r.state = rpcActive
×
UNCOV
303
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
304
}
×
305

306
// SetServerActive moves the RPC state from rpcActive to serverActive.
UNCOV
307
func (r *InterceptorChain) SetServerActive() {
×
UNCOV
308
        r.Lock()
×
UNCOV
309
        defer r.Unlock()
×
UNCOV
310

×
UNCOV
311
        r.state = serverActive
×
UNCOV
312
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
313
}
×
314

315
// rpcStateToWalletState converts rpcState to lnrpc.WalletState. Returns
316
// WAITING_TO_START and an error on conversion error.
UNCOV
317
func rpcStateToWalletState(state rpcState) (lnrpc.WalletState, error) {
×
UNCOV
318
        const defaultState = lnrpc.WalletState_WAITING_TO_START
×
UNCOV
319
        var walletState lnrpc.WalletState
×
UNCOV
320

×
UNCOV
321
        switch state {
×
UNCOV
322
        case waitingToStart:
×
UNCOV
323
                walletState = lnrpc.WalletState_WAITING_TO_START
×
UNCOV
324
        case walletNotCreated:
×
UNCOV
325
                walletState = lnrpc.WalletState_NON_EXISTING
×
UNCOV
326
        case walletLocked:
×
UNCOV
327
                walletState = lnrpc.WalletState_LOCKED
×
UNCOV
328
        case walletUnlocked:
×
UNCOV
329
                walletState = lnrpc.WalletState_UNLOCKED
×
NEW
330
        case allowRemoteSigner:
×
NEW
331
                walletState = lnrpc.WalletState_ALLOW_REMOTE_SIGNER
×
UNCOV
332
        case rpcActive:
×
UNCOV
333
                walletState = lnrpc.WalletState_RPC_ACTIVE
×
UNCOV
334
        case serverActive:
×
UNCOV
335
                walletState = lnrpc.WalletState_SERVER_ACTIVE
×
336

337
        default:
×
338
                return defaultState, fmt.Errorf("unknown wallet state %v", state)
×
339
        }
340

UNCOV
341
        return walletState, nil
×
342
}
343

344
// SubscribeState subscribes to the state of the wallet. The current wallet
345
// state will always be delivered immediately.
346
//
347
// NOTE: Part of the StateService interface.
348
func (r *InterceptorChain) SubscribeState(_ *lnrpc.SubscribeStateRequest,
UNCOV
349
        stream lnrpc.State_SubscribeStateServer) error {
×
UNCOV
350

×
UNCOV
351
        sendStateUpdate := func(state rpcState) error {
×
UNCOV
352
                walletState, err := rpcStateToWalletState(state)
×
UNCOV
353
                if err != nil {
×
354
                        return err
×
355
                }
×
356

UNCOV
357
                return stream.Send(&lnrpc.SubscribeStateResponse{
×
UNCOV
358
                        State: walletState,
×
UNCOV
359
                })
×
360
        }
361

362
        // Subscribe to state updates.
UNCOV
363
        client, err := r.ntfnServer.Subscribe()
×
UNCOV
364
        if err != nil {
×
365
                return err
×
366
        }
×
UNCOV
367
        defer client.Cancel()
×
UNCOV
368

×
UNCOV
369
        // Always start by sending the current state.
×
UNCOV
370
        r.RLock()
×
UNCOV
371
        state := r.state
×
UNCOV
372
        r.RUnlock()
×
UNCOV
373

×
UNCOV
374
        if err := sendStateUpdate(state); err != nil {
×
375
                return err
×
376
        }
×
377

UNCOV
378
        for {
×
UNCOV
379
                select {
×
UNCOV
380
                case e := <-client.Updates():
×
UNCOV
381
                        newState := e.(rpcState)
×
UNCOV
382

×
UNCOV
383
                        // Ignore already sent state.
×
UNCOV
384
                        if newState == state {
×
385
                                continue
×
386
                        }
387

UNCOV
388
                        state = newState
×
UNCOV
389
                        err := sendStateUpdate(state)
×
UNCOV
390
                        if err != nil {
×
UNCOV
391
                                return err
×
UNCOV
392
                        }
×
393

394
                // The response stream's context for whatever reason has been
395
                // closed. If context is closed by an exceeded deadline we will
396
                // return an error.
UNCOV
397
                case <-stream.Context().Done():
×
UNCOV
398
                        if errors.Is(stream.Context().Err(), context.Canceled) {
×
UNCOV
399
                                return nil
×
UNCOV
400
                        }
×
401
                        return stream.Context().Err()
×
402

403
                case <-r.quit:
×
404
                        return fmt.Errorf("server exiting")
×
405
                }
406
        }
407
}
408

409
// GetState returns the current wallet state.
410
func (r *InterceptorChain) GetState(_ context.Context,
411
        _ *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) {
×
412

×
413
        r.RLock()
×
414
        state := r.state
×
415
        r.RUnlock()
×
416

×
417
        walletState, err := rpcStateToWalletState(state)
×
418
        if err != nil {
×
419
                return nil, err
×
420
        }
×
421

422
        return &lnrpc.GetStateResponse{
×
423
                State: walletState,
×
424
        }, nil
×
425
}
426

427
// AddMacaroonService adds a macaroon service to the interceptor. After this is
428
// done every RPC call made will have to pass a valid macaroon to be accepted.
UNCOV
429
func (r *InterceptorChain) AddMacaroonService(svc *macaroons.Service) {
×
UNCOV
430
        r.Lock()
×
UNCOV
431
        defer r.Unlock()
×
UNCOV
432

×
UNCOV
433
        r.svc = svc
×
UNCOV
434
}
×
435

436
// MacaroonService returns the currently registered macaroon service. This might
437
// be nil if none was registered (yet).
UNCOV
438
func (r *InterceptorChain) MacaroonService() *macaroons.Service {
×
UNCOV
439
        r.RLock()
×
UNCOV
440
        defer r.RUnlock()
×
UNCOV
441

×
UNCOV
442
        return r.svc
×
UNCOV
443
}
×
444

445
// AddPermission adds a new macaroon rule for the given method.
UNCOV
446
func (r *InterceptorChain) AddPermission(method string, ops []bakery.Op) error {
×
UNCOV
447
        r.Lock()
×
UNCOV
448
        defer r.Unlock()
×
UNCOV
449

×
UNCOV
450
        if _, ok := r.permissionMap[method]; ok {
×
451
                return fmt.Errorf("detected duplicate macaroon constraints "+
×
452
                        "for path: %v", method)
×
453
        }
×
454

UNCOV
455
        r.permissionMap[method] = ops
×
UNCOV
456
        return nil
×
457
}
458

459
// Permissions returns the current set of macaroon permissions.
UNCOV
460
func (r *InterceptorChain) Permissions() map[string][]bakery.Op {
×
UNCOV
461
        r.RLock()
×
UNCOV
462
        defer r.RUnlock()
×
UNCOV
463

×
UNCOV
464
        // Make a copy under the read lock to avoid races.
×
UNCOV
465
        c := make(map[string][]bakery.Op)
×
UNCOV
466
        for k, v := range r.permissionMap {
×
UNCOV
467
                s := make([]bakery.Op, len(v))
×
UNCOV
468
                copy(s, v)
×
UNCOV
469
                c[k] = s
×
UNCOV
470
        }
×
471

UNCOV
472
        return c
×
473
}
474

475
// RegisterMiddleware registers a new middleware that will handle request/
476
// response interception for all RPC messages that are initiated with a custom
477
// macaroon caveat. The name of the custom caveat a middleware is handling is
478
// also its unique identifier. Only one middleware can be registered for each
479
// custom caveat.
UNCOV
480
func (r *InterceptorChain) RegisterMiddleware(mw *MiddlewareHandler) error {
×
UNCOV
481
        r.Lock()
×
UNCOV
482
        defer r.Unlock()
×
UNCOV
483

×
UNCOV
484
        // The name of the middleware is the unique identifier.
×
UNCOV
485
        _, ok := r.registeredMiddlewareNames[mw.middlewareName]
×
UNCOV
486
        if ok {
×
487
                return fmt.Errorf("a middleware with the name '%s' is already "+
×
488
                        "registered", mw.middlewareName)
×
489
        }
×
490

491
        // For now, we only want one middleware per custom caveat name. If we
492
        // allowed multiple middlewares handling the same caveat there would be
493
        // a need for extra call chaining logic, and they could overwrite each
494
        // other's responses.
UNCOV
495
        for _, middleware := range r.registeredMiddleware {
×
496
                if middleware.customCaveatName == mw.customCaveatName {
×
497
                        return fmt.Errorf("a middleware is already registered "+
×
498
                                "for the custom caveat name '%s': %v",
×
499
                                mw.customCaveatName, middleware.middlewareName)
×
500
                }
×
501
        }
502

UNCOV
503
        r.registeredMiddleware = append(r.registeredMiddleware, mw)
×
UNCOV
504
        index := len(r.registeredMiddleware) - 1
×
UNCOV
505
        r.registeredMiddlewareNames[mw.middlewareName] = index
×
UNCOV
506

×
UNCOV
507
        return nil
×
508
}
509

510
// RemoveMiddleware removes the middleware that handles the given custom caveat
511
// name.
UNCOV
512
func (r *InterceptorChain) RemoveMiddleware(middlewareName string) {
×
UNCOV
513
        r.Lock()
×
UNCOV
514
        defer r.Unlock()
×
UNCOV
515

×
UNCOV
516
        log.Debugf("Removing middleware %s", middlewareName)
×
UNCOV
517

×
UNCOV
518
        index, ok := r.registeredMiddlewareNames[middlewareName]
×
UNCOV
519
        if !ok {
×
520
                return
×
521
        }
×
UNCOV
522
        delete(r.registeredMiddlewareNames, middlewareName)
×
UNCOV
523

×
UNCOV
524
        r.registeredMiddleware = append(
×
UNCOV
525
                r.registeredMiddleware[:index],
×
UNCOV
526
                r.registeredMiddleware[index+1:]...,
×
UNCOV
527
        )
×
UNCOV
528

×
UNCOV
529
        // Re-initialise the middleware look-up map with the updated indexes.
×
UNCOV
530
        r.registeredMiddlewareNames = make(map[string]int)
×
UNCOV
531
        for i, mw := range r.registeredMiddleware {
×
532
                r.registeredMiddlewareNames[mw.middlewareName] = i
×
533
        }
×
534
}
535

536
// CustomCaveatSupported makes sure a middleware that handles the given custom
537
// caveat name is registered. If none is, an error is returned, signalling to
538
// the macaroon bakery and its validator to reject macaroons that have a custom
539
// caveat with that name.
540
//
541
// NOTE: This method is part of the macaroons.CustomCaveatAcceptor interface.
UNCOV
542
func (r *InterceptorChain) CustomCaveatSupported(customCaveatName string) error {
×
UNCOV
543
        r.RLock()
×
UNCOV
544
        defer r.RUnlock()
×
UNCOV
545

×
UNCOV
546
        // We only accept requests with a custom caveat if we also have a
×
UNCOV
547
        // middleware registered that handles that custom caveat. That is
×
UNCOV
548
        // crucial for security! Otherwise a request with an encumbered (=has
×
UNCOV
549
        // restricted permissions based upon the custom caveat condition)
×
UNCOV
550
        // macaroon would not be validated against the limitations that the
×
UNCOV
551
        // custom caveat implicate. Since the map is keyed by the _name_ of the
×
UNCOV
552
        // middleware, we need to loop through all of them to see if one has
×
UNCOV
553
        // the given custom macaroon caveat name.
×
UNCOV
554
        for _, middleware := range r.registeredMiddleware {
×
UNCOV
555
                if middleware.customCaveatName == customCaveatName {
×
UNCOV
556
                        return nil
×
UNCOV
557
                }
×
558
        }
559

UNCOV
560
        return fmt.Errorf("cannot accept macaroon with custom caveat '%s', "+
×
UNCOV
561
                "no middleware registered to handle it", customCaveatName)
×
562
}
563

564
// CreateServerOpts creates the GRPC server options that can be added to a GRPC
565
// server in order to add this InterceptorChain.
UNCOV
566
func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption {
×
UNCOV
567
        var unaryInterceptors []grpc.UnaryServerInterceptor
×
UNCOV
568
        var strmInterceptors []grpc.StreamServerInterceptor
×
UNCOV
569

×
UNCOV
570
        // The first interceptors we'll add to the chain is our logging
×
UNCOV
571
        // interceptors, so we can automatically log all errors that happen
×
UNCOV
572
        // during RPC calls.
×
UNCOV
573
        unaryInterceptors = append(
×
UNCOV
574
                unaryInterceptors, errorLogUnaryServerInterceptor(r.rpcsLog),
×
UNCOV
575
        )
×
UNCOV
576
        strmInterceptors = append(
×
UNCOV
577
                strmInterceptors, errorLogStreamServerInterceptor(r.rpcsLog),
×
UNCOV
578
        )
×
UNCOV
579

×
UNCOV
580
        // Next we'll add our RPC state check interceptors, that will check
×
UNCOV
581
        // whether the attempted call is allowed in the current state.
×
UNCOV
582
        unaryInterceptors = append(
×
UNCOV
583
                unaryInterceptors, r.rpcStateUnaryServerInterceptor(),
×
UNCOV
584
        )
×
UNCOV
585
        strmInterceptors = append(
×
UNCOV
586
                strmInterceptors, r.rpcStateStreamServerInterceptor(),
×
UNCOV
587
        )
×
UNCOV
588

×
UNCOV
589
        // We'll add the macaroon interceptors. If macaroons aren't disabled,
×
UNCOV
590
        // then these interceptors will enforce macaroon authentication.
×
UNCOV
591
        unaryInterceptors = append(
×
UNCOV
592
                unaryInterceptors, r.MacaroonUnaryServerInterceptor(),
×
UNCOV
593
        )
×
UNCOV
594
        strmInterceptors = append(
×
UNCOV
595
                strmInterceptors, r.MacaroonStreamServerInterceptor(),
×
UNCOV
596
        )
×
UNCOV
597

×
UNCOV
598
        // Next, we'll add the interceptors for our custom macaroon caveat based
×
UNCOV
599
        // middleware.
×
UNCOV
600
        unaryInterceptors = append(
×
UNCOV
601
                unaryInterceptors, r.middlewareUnaryServerInterceptor(),
×
UNCOV
602
        )
×
UNCOV
603
        strmInterceptors = append(
×
UNCOV
604
                strmInterceptors, r.middlewareStreamServerInterceptor(),
×
UNCOV
605
        )
×
UNCOV
606

×
UNCOV
607
        // Get interceptors for Prometheus to gather gRPC performance metrics.
×
UNCOV
608
        // If monitoring is disabled, GetPromInterceptors() will return empty
×
UNCOV
609
        // slices.
×
UNCOV
610
        promUnaryInterceptors, promStrmInterceptors :=
×
UNCOV
611
                monitoring.GetPromInterceptors()
×
UNCOV
612

×
UNCOV
613
        // Concatenate the slices of unary and stream interceptors respectively.
×
UNCOV
614
        unaryInterceptors = append(unaryInterceptors, promUnaryInterceptors...)
×
UNCOV
615
        strmInterceptors = append(strmInterceptors, promStrmInterceptors...)
×
UNCOV
616

×
UNCOV
617
        // Create server options from the interceptors we just set up.
×
UNCOV
618
        chainedUnary := grpc_middleware.WithUnaryServerChain(
×
UNCOV
619
                unaryInterceptors...,
×
UNCOV
620
        )
×
UNCOV
621
        chainedStream := grpc_middleware.WithStreamServerChain(
×
UNCOV
622
                strmInterceptors...,
×
UNCOV
623
        )
×
UNCOV
624
        serverOpts := []grpc.ServerOption{chainedUnary, chainedStream}
×
UNCOV
625

×
UNCOV
626
        return serverOpts
×
UNCOV
627
}
×
628

629
// errorLogUnaryServerInterceptor is a simple UnaryServerInterceptor that will
630
// automatically log any errors that occur when serving a client's unary
631
// request.
UNCOV
632
func errorLogUnaryServerInterceptor(logger btclog.Logger) grpc.UnaryServerInterceptor {
×
UNCOV
633
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
×
UNCOV
634
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
635

×
UNCOV
636
                resp, err := handler(ctx, req)
×
UNCOV
637
                if err != nil {
×
UNCOV
638
                        // TODO(roasbeef): also log request details?
×
UNCOV
639
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
×
UNCOV
640
                }
×
641

UNCOV
642
                return resp, err
×
643
        }
644
}
645

646
// errorLogStreamServerInterceptor is a simple StreamServerInterceptor that
647
// will log any errors that occur while processing a client or server streaming
648
// RPC.
UNCOV
649
func errorLogStreamServerInterceptor(logger btclog.Logger) grpc.StreamServerInterceptor {
×
UNCOV
650
        return func(srv interface{}, ss grpc.ServerStream,
×
UNCOV
651
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
UNCOV
652

×
UNCOV
653
                err := handler(srv, ss)
×
UNCOV
654
                if err != nil {
×
UNCOV
655
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
×
UNCOV
656
                }
×
657

UNCOV
658
                return err
×
659
        }
660
}
661

662
// checkMacaroon validates that the context contains the macaroon needed to
663
// invoke the given RPC method.
664
func (r *InterceptorChain) checkMacaroon(ctx context.Context,
UNCOV
665
        fullMethod string) error {
×
UNCOV
666

×
UNCOV
667
        // If noMacaroons is set, we'll always allow the call.
×
UNCOV
668
        if r.noMacaroons {
×
669
                return nil
×
670
        }
×
671

672
        // Check whether the method is whitelisted, if so we'll allow it
673
        // regardless of macaroons.
UNCOV
674
        _, ok := macaroonWhitelist[fullMethod]
×
UNCOV
675
        if ok {
×
UNCOV
676
                return nil
×
UNCOV
677
        }
×
678

UNCOV
679
        r.RLock()
×
UNCOV
680
        svc := r.svc
×
UNCOV
681
        r.RUnlock()
×
UNCOV
682

×
UNCOV
683
        // If the macaroon service is not yet active, we cannot allow
×
UNCOV
684
        // the call.
×
UNCOV
685
        if svc == nil {
×
686
                return fmt.Errorf("unable to determine macaroon permissions")
×
687
        }
×
688

UNCOV
689
        r.RLock()
×
UNCOV
690
        uriPermissions, ok := r.permissionMap[fullMethod]
×
UNCOV
691
        r.RUnlock()
×
UNCOV
692
        if !ok {
×
693
                return fmt.Errorf("%s: unknown permissions required for method",
×
694
                        fullMethod)
×
695
        }
×
696

697
        // Find out if there is an external validator registered for
698
        // this method. Fall back to the internal one if there isn't.
UNCOV
699
        validator, ok := svc.ExternalValidators[fullMethod]
×
UNCOV
700
        if !ok {
×
UNCOV
701
                validator = svc
×
UNCOV
702
        }
×
703

704
        // Now that we know what validator to use, let it do its work.
UNCOV
705
        return validator.ValidateMacaroon(ctx, uriPermissions, fullMethod)
×
706
}
707

708
// MacaroonUnaryServerInterceptor is a GRPC interceptor that checks whether the
709
// request is authorized by the included macaroons.
UNCOV
710
func (r *InterceptorChain) MacaroonUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
UNCOV
711
        return func(ctx context.Context, req interface{},
×
UNCOV
712
                info *grpc.UnaryServerInfo,
×
UNCOV
713
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
714

×
UNCOV
715
                // Check macaroons.
×
UNCOV
716
                if err := r.checkMacaroon(ctx, info.FullMethod); err != nil {
×
UNCOV
717
                        return nil, err
×
UNCOV
718
                }
×
719

UNCOV
720
                return handler(ctx, req)
×
721
        }
722
}
723

724
// MacaroonStreamServerInterceptor is a GRPC interceptor that checks whether
725
// the request is authorized by the included macaroons.
UNCOV
726
func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerInterceptor {
×
UNCOV
727
        return func(srv interface{}, ss grpc.ServerStream,
×
UNCOV
728
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
UNCOV
729

×
UNCOV
730
                // Check macaroons.
×
UNCOV
731
                err := r.checkMacaroon(ss.Context(), info.FullMethod)
×
UNCOV
732
                if err != nil {
×
733
                        return err
×
734
                }
×
735

UNCOV
736
                return handler(srv, ss)
×
737
        }
738
}
739

740
// checkRPCState checks whether a call to the given server is allowed in the
741
// current RPC state.
742
func (r *InterceptorChain) checkRPCState(srv interface{},
NEW
743
        fullMethod string) error {
×
NEW
744

×
UNCOV
745
        // The StateService is being accessed, we allow the call regardless of
×
UNCOV
746
        // the current state.
×
UNCOV
747
        _, ok := srv.(lnrpc.StateServer)
×
UNCOV
748
        if ok {
×
UNCOV
749
                return nil
×
UNCOV
750
        }
×
751

UNCOV
752
        r.RLock()
×
UNCOV
753
        state := r.state
×
UNCOV
754
        r.RUnlock()
×
UNCOV
755

×
UNCOV
756
        switch state {
×
757
        // Do not accept any RPC calls (unless to the state service) until LND
758
        // has not started.
759
        case waitingToStart:
×
760
                return ErrWaitingToStart
×
761

762
        // If the wallet does not exists, only calls to the WalletUnlocker are
763
        // accepted.
UNCOV
764
        case walletNotCreated:
×
UNCOV
765
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
UNCOV
766
                if !ok {
×
767
                        return ErrNoWallet
×
768
                }
×
769

770
        // If the wallet is locked, only calls to the WalletUnlocker are
771
        // accepted.
UNCOV
772
        case walletLocked:
×
UNCOV
773
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
UNCOV
774
                if !ok {
×
775
                        return ErrWalletLocked
×
776
                }
×
777

778
        // If the wallet is unlocked, but the RPC not yet active, we reject.
779
        case walletUnlocked:
×
780
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
781
                if ok {
×
782
                        return ErrWalletUnlocked
×
783
                }
×
784

785
                return ErrRPCStarting
×
786

787
        // If lnd is waiting for the remote signer to connect, we only allow
788
        // calls to the remote signer.
NEW
789
        case allowRemoteSigner:
×
NEW
790
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
NEW
791
                if ok {
×
NEW
792
                        return ErrWalletUnlocked
×
NEW
793
                }
×
794

795
                // As we only allow calls to connect the remote signer until the
796
                // full rpc server is active, we check whether the method is
797
                // whitelisted or not.
NEW
798
                _, ok = allowRemoteSignerWhitelist[fullMethod]
×
NEW
799
                if !ok {
×
NEW
800
                        return ErrAwaitingRemoteSigner
×
NEW
801
                }
×
802

803
        // If the RPC server or lnd server is active, we allow calls to any
804
        // service except the WalletUnlocker.
UNCOV
805
        case rpcActive, serverActive:
×
UNCOV
806
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
UNCOV
807
                if ok {
×
808
                        return ErrWalletUnlocked
×
809
                }
×
810

811
        default:
×
812
                return fmt.Errorf("unknown RPC state: %v", state)
×
813
        }
814

UNCOV
815
        return nil
×
816
}
817

818
// rpcStateUnaryServerInterceptor is a GRPC interceptor that checks whether
819
// calls to the given gGRPC server is allowed in the current rpc state.
UNCOV
820
func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
UNCOV
821
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
×
UNCOV
822
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
823

×
NEW
824
                method := info.FullMethod
×
NEW
825
                r.rpcsLog.Debugf("[%v] requested", method)
×
UNCOV
826

×
NEW
827
                if err := r.checkRPCState(info.Server, method); err != nil {
×
828
                        return nil, err
×
829
                }
×
830

UNCOV
831
                return handler(ctx, req)
×
832
        }
833
}
834

835
// rpcStateStreamServerInterceptor is a GRPC interceptor that checks whether
836
// calls to the given gGRPC server is allowed in the current rpc state.
UNCOV
837
func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerInterceptor {
×
UNCOV
838
        return func(srv interface{}, ss grpc.ServerStream,
×
UNCOV
839
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
UNCOV
840

×
UNCOV
841
                r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
×
UNCOV
842

×
NEW
843
                if err := r.checkRPCState(srv, info.FullMethod); err != nil {
×
844
                        return err
×
845
                }
×
846

UNCOV
847
                return handler(srv, ss)
×
848
        }
849
}
850

851
// middlewareUnaryServerInterceptor is a unary gRPC interceptor that intercepts
852
// all requests and responses that are sent with a macaroon containing a custom
853
// caveat condition that is handled by registered middleware.
UNCOV
854
func (r *InterceptorChain) middlewareUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
UNCOV
855
        return func(ctx context.Context,
×
UNCOV
856
                req interface{}, info *grpc.UnaryServerInfo,
×
UNCOV
857
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
858

×
UNCOV
859
                // Make sure we don't allow any requests through if one of the
×
UNCOV
860
                // mandatory middlewares is missing.
×
UNCOV
861
                fullMethod := info.FullMethod
×
UNCOV
862
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
×
863
                        return nil, err
×
864
                }
×
865

866
                // If there is no middleware registered, we don't need to
867
                // intercept anything.
UNCOV
868
                if !r.middlewareRegistered() {
×
UNCOV
869
                        return handler(ctx, req)
×
UNCOV
870
                }
×
871

UNCOV
872
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
×
UNCOV
873
                req, err := r.interceptMessage(
×
UNCOV
874
                        ctx, TypeRequest, requestID, false, info.FullMethod,
×
UNCOV
875
                        req,
×
UNCOV
876
                )
×
UNCOV
877
                if err != nil {
×
878
                        return nil, err
×
879
                }
×
880

881
                // Call the handler, which executes the request against lnd.
UNCOV
882
                lndResp, lndErr := handler(ctx, req)
×
UNCOV
883
                if lndErr != nil {
×
UNCOV
884
                        // The call to lnd ended in an error and not a normal
×
UNCOV
885
                        // proto message response. Send the error to the
×
UNCOV
886
                        // interceptor as well to inform about the abnormal
×
UNCOV
887
                        // termination of the stream and to give the option to
×
UNCOV
888
                        // replace the error message with a custom one.
×
UNCOV
889
                        replacedErr, err := r.interceptMessage(
×
UNCOV
890
                                ctx, TypeResponse, requestID, false,
×
UNCOV
891
                                info.FullMethod, lndErr,
×
UNCOV
892
                        )
×
UNCOV
893
                        if err != nil {
×
894
                                return nil, err
×
895
                        }
×
UNCOV
896
                        return lndResp, replacedErr.(error)
×
897
                }
898

UNCOV
899
                return r.interceptMessage(
×
UNCOV
900
                        ctx, TypeResponse, requestID, false, info.FullMethod,
×
UNCOV
901
                        lndResp,
×
UNCOV
902
                )
×
903
        }
904
}
905

906
// middlewareStreamServerInterceptor is a streaming gRPC interceptor that
907
// intercepts all requests and responses that are sent with a macaroon
908
// containing a custom caveat condition that is handled by registered
909
// middleware.
UNCOV
910
func (r *InterceptorChain) middlewareStreamServerInterceptor() grpc.StreamServerInterceptor {
×
UNCOV
911
        return func(srv interface{},
×
UNCOV
912
                ss grpc.ServerStream, info *grpc.StreamServerInfo,
×
UNCOV
913
                handler grpc.StreamHandler) error {
×
UNCOV
914

×
UNCOV
915
                // Don't intercept the interceptor itself which is a streaming
×
UNCOV
916
                // RPC too!
×
UNCOV
917
                fullMethod := info.FullMethod
×
UNCOV
918
                if fullMethod == lnrpc.RegisterRPCMiddlewareURI {
×
UNCOV
919
                        return handler(srv, ss)
×
UNCOV
920
                }
×
921

922
                // Make sure we don't allow any requests through if one of the
923
                // mandatory middlewares is missing. We add this check here to
924
                // make sure the middleware registration itself can still be
925
                // called.
UNCOV
926
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
×
927
                        return err
×
928
                }
×
929

930
                // If there is no middleware registered, we don't need to
931
                // intercept anything.
UNCOV
932
                if !r.middlewareRegistered() {
×
UNCOV
933
                        return handler(srv, ss)
×
UNCOV
934
                }
×
935

936
                // To give the middleware a chance to accept or reject the
937
                // establishment of the stream itself (and not only when the
938
                // first message is sent on the stream), we send an intercept
939
                // request for the stream auth now:
UNCOV
940
                msg, err := NewStreamAuthInterceptionRequest(
×
UNCOV
941
                        ss.Context(), info.FullMethod,
×
UNCOV
942
                )
×
UNCOV
943
                if err != nil {
×
944
                        return err
×
945
                }
×
946

UNCOV
947
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
×
UNCOV
948
                err = r.acceptStream(requestID, msg)
×
UNCOV
949
                if err != nil {
×
950
                        return err
×
951
                }
×
952

UNCOV
953
                wrappedSS := &serverStreamWrapper{
×
UNCOV
954
                        ServerStream: ss,
×
UNCOV
955
                        requestID:    requestID,
×
UNCOV
956
                        fullMethod:   info.FullMethod,
×
UNCOV
957
                        interceptor:  r,
×
UNCOV
958
                }
×
UNCOV
959

×
UNCOV
960
                // Call the stream handler, which will block as long as the
×
UNCOV
961
                // stream is alive.
×
UNCOV
962
                lndErr := handler(srv, wrappedSS)
×
UNCOV
963
                if lndErr != nil {
×
UNCOV
964
                        // This is an error being returned from lnd. Send it to
×
UNCOV
965
                        // the interceptor as well to inform about the abnormal
×
UNCOV
966
                        // termination of the stream and to give the option to
×
UNCOV
967
                        // replace the error message with a custom one.
×
UNCOV
968
                        replacedErr, err := r.interceptMessage(
×
UNCOV
969
                                ss.Context(), TypeResponse, requestID,
×
UNCOV
970
                                true, info.FullMethod, lndErr,
×
UNCOV
971
                        )
×
UNCOV
972
                        if err != nil {
×
973
                                return err
×
974
                        }
×
975

UNCOV
976
                        return replacedErr.(error)
×
977
                }
978

979
                // Normal/successful termination of the stream.
UNCOV
980
                return nil
×
981
        }
982
}
983

984
// checkMandatoryMiddleware makes sure that each of the middlewares declared as
985
// mandatory is currently registered.
UNCOV
986
func (r *InterceptorChain) checkMandatoryMiddleware(fullMethod string) error {
×
UNCOV
987
        r.RLock()
×
UNCOV
988
        defer r.RUnlock()
×
UNCOV
989

×
UNCOV
990
        // Allow calls that are whitelisted for macaroons as well, otherwise we
×
UNCOV
991
        // get into impossible situations where the wallet is locked but the
×
UNCOV
992
        // unlock call is denied because the middleware isn't registered. But
×
UNCOV
993
        // the middleware cannot register itself because the wallet is locked.
×
UNCOV
994
        if _, ok := macaroonWhitelist[fullMethod]; ok {
×
UNCOV
995
                return nil
×
UNCOV
996
        }
×
997

998
        // Not a white listed call so make sure every mandatory middleware is
999
        // currently connected to lnd.
UNCOV
1000
        for _, name := range r.mandatoryMiddleware {
×
1001
                if _, ok := r.registeredMiddlewareNames[name]; !ok {
×
1002
                        return fmt.Errorf("mandatory middleware '%s' is "+
×
1003
                                "currently not registered, not allowing any "+
×
1004
                                "RPC calls", name)
×
1005
                }
×
1006
        }
1007

UNCOV
1008
        return nil
×
1009
}
1010

1011
// middlewareRegistered returns true if there is at least one middleware
1012
// currently registered.
UNCOV
1013
func (r *InterceptorChain) middlewareRegistered() bool {
×
UNCOV
1014
        r.RLock()
×
UNCOV
1015
        defer r.RUnlock()
×
UNCOV
1016

×
UNCOV
1017
        return len(r.registeredMiddleware) > 0
×
UNCOV
1018
}
×
1019

1020
// acceptStream sends an intercept request to all middlewares that have
1021
// registered for it. This means either a middleware has requested read-only
1022
// access or the request actually has a macaroon with a caveat the middleware
1023
// registered for.
1024
func (r *InterceptorChain) acceptStream(requestID uint64,
UNCOV
1025
        msg *InterceptionRequest) error {
×
UNCOV
1026

×
UNCOV
1027
        r.RLock()
×
UNCOV
1028
        defer r.RUnlock()
×
UNCOV
1029

×
UNCOV
1030
        for _, middleware := range r.registeredMiddleware {
×
UNCOV
1031
                // If there is a custom caveat in the macaroon, make sure the
×
UNCOV
1032
                // middleware registered for it. Or if a middleware registered
×
UNCOV
1033
                // for read-only mode, it also gets the request.
×
UNCOV
1034
                hasCustomCaveat := macaroons.HasCustomCaveat(
×
UNCOV
1035
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
1036
                )
×
UNCOV
1037
                if !hasCustomCaveat && !middleware.readOnly {
×
1038
                        continue
×
1039
                }
1040

UNCOV
1041
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
×
UNCOV
1042
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
1043
                )
×
UNCOV
1044

×
UNCOV
1045
                resp, err := middleware.intercept(requestID, msg)
×
UNCOV
1046

×
UNCOV
1047
                // Error during interception itself.
×
UNCOV
1048
                if err != nil {
×
1049
                        return err
×
1050
                }
×
1051

1052
                // Error returned from middleware client.
UNCOV
1053
                if resp.err != nil {
×
1054
                        return resp.err
×
1055
                }
×
1056
        }
1057

UNCOV
1058
        return nil
×
1059
}
1060

1061
// interceptMessage sends out an intercept request for an RPC response. Since
1062
// middleware that hasn't registered for the read-only mode has the option to
1063
// overwrite/replace the message, this needs to be handled differently than the
1064
// auth path above.
1065
func (r *InterceptorChain) interceptMessage(ctx context.Context,
1066
        interceptType InterceptType, requestID uint64, isStream bool,
UNCOV
1067
        fullMethod string, m interface{}) (interface{}, error) {
×
UNCOV
1068

×
UNCOV
1069
        r.RLock()
×
UNCOV
1070
        defer r.RUnlock()
×
UNCOV
1071

×
UNCOV
1072
        currentMessage := m
×
UNCOV
1073
        for _, middleware := range r.registeredMiddleware {
×
UNCOV
1074
                msg, err := NewMessageInterceptionRequest(
×
UNCOV
1075
                        ctx, interceptType, isStream, fullMethod,
×
UNCOV
1076
                        currentMessage,
×
UNCOV
1077
                )
×
UNCOV
1078
                if err != nil {
×
1079
                        return nil, err
×
1080
                }
×
1081

1082
                // If there is a custom caveat in the macaroon, make sure the
1083
                // middleware registered for it. Or if a middleware registered
1084
                // for read-only mode, it also gets the request.
UNCOV
1085
                hasCustomCaveat := macaroons.HasCustomCaveat(
×
UNCOV
1086
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
1087
                )
×
UNCOV
1088
                if !hasCustomCaveat && !middleware.readOnly {
×
UNCOV
1089
                        continue
×
1090
                }
1091

UNCOV
1092
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
×
UNCOV
1093
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
1094
                )
×
UNCOV
1095

×
UNCOV
1096
                resp, err := middleware.intercept(requestID, msg)
×
UNCOV
1097

×
UNCOV
1098
                // Error during interception itself.
×
UNCOV
1099
                if err != nil {
×
1100
                        return nil, err
×
1101
                }
×
1102

1103
                // Error returned from middleware client.
UNCOV
1104
                if resp.err != nil {
×
1105
                        return nil, resp.err
×
1106
                }
×
1107

1108
                // The message was replaced, make sure the next middleware in
1109
                // line receives the updated message.
UNCOV
1110
                if !middleware.readOnly && resp.replace {
×
UNCOV
1111
                        currentMessage = resp.replacement
×
UNCOV
1112
                }
×
1113
        }
1114

UNCOV
1115
        return currentMessage, nil
×
1116
}
1117

1118
// serverStreamWrapper is a struct that wraps a server stream in a way that all
1119
// requests and responses can be intercepted individually.
1120
type serverStreamWrapper struct {
1121
        // ServerStream is the stream that's being wrapped.
1122
        grpc.ServerStream
1123

1124
        requestID uint64
1125

1126
        fullMethod string
1127

1128
        interceptor *InterceptorChain
1129
}
1130

1131
// SendMsg is called when lnd sends a message to the client. This is wrapped to
1132
// intercept streaming RPC responses.
UNCOV
1133
func (w *serverStreamWrapper) SendMsg(m interface{}) error {
×
UNCOV
1134
        newMsg, err := w.interceptor.interceptMessage(
×
UNCOV
1135
                w.ServerStream.Context(), TypeResponse, w.requestID, true,
×
UNCOV
1136
                w.fullMethod, m,
×
UNCOV
1137
        )
×
UNCOV
1138
        if err != nil {
×
1139
                return err
×
1140
        }
×
1141

UNCOV
1142
        return w.ServerStream.SendMsg(newMsg)
×
1143
}
1144

1145
// RecvMsg is called when lnd wants to receive a message from the client. This
1146
// is wrapped to intercept streaming RPC requests.
UNCOV
1147
func (w *serverStreamWrapper) RecvMsg(m interface{}) error {
×
UNCOV
1148
        err := w.ServerStream.RecvMsg(m)
×
UNCOV
1149
        if err != nil {
×
1150
                return err
×
1151
        }
×
1152

UNCOV
1153
        req, err := w.interceptor.interceptMessage(
×
UNCOV
1154
                w.ServerStream.Context(), TypeRequest, w.requestID, true,
×
UNCOV
1155
                w.fullMethod, m,
×
UNCOV
1156
        )
×
UNCOV
1157
        if err != nil {
×
1158
                return err
×
1159
        }
×
1160

UNCOV
1161
        return replaceProtoMsg(m, req)
×
1162
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc