• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13440912774

20 Feb 2025 05:14PM UTC coverage: 57.697% (-1.1%) from 58.802%
13440912774

Pull #9535

github

guggero
GitHub: remove duplicate caching

Turns out that actions/setup-go starting with @v4 also adds caching.
With that, our cache size on disk has almost doubled, leading to the
GitHub runner running out of space in certain situation.
We fix that by disabling the automated caching since we already have our
own, custom-tailored version.
Pull Request #9535: GitHub: remove duplicate caching

103519 of 179417 relevant lines covered (57.7%)

24825.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/rpcperms/interceptor.go
1
package rpcperms
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btclog/v2"
11
        grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
12
        "github.com/lightningnetwork/lnd/lnrpc"
13
        "github.com/lightningnetwork/lnd/macaroons"
14
        "github.com/lightningnetwork/lnd/monitoring"
15
        "github.com/lightningnetwork/lnd/subscribe"
16
        "google.golang.org/grpc"
17
        "gopkg.in/macaroon-bakery.v2/bakery"
18
)
19

20
// rpcState is an enum that we use to keep track of the current RPC service
21
// state. This will transition as we go from startup to unlocking the wallet,
22
// and finally fully active.
23
type rpcState uint8
24

25
const (
26
        // waitingToStart indicates that we're at the beginning of the startup
27
        // process. In a cluster environment this may mean that we're waiting to
28
        // become the leader in which case RPC calls will be disabled until
29
        // this instance has been elected as leader.
30
        waitingToStart rpcState = iota
31

32
        // walletNotCreated is the starting state if the RPC server is active,
33
        // but the wallet is not yet created. In this state we'll only allow
34
        // calls to the WalletUnlockerService.
35
        walletNotCreated
36

37
        // walletLocked indicates the RPC server is active, but the wallet is
38
        // locked. In this state we'll only allow calls to the
39
        // WalletUnlockerService.
40
        walletLocked
41

42
        // walletUnlocked means that the wallet has been unlocked, but the full
43
        // RPC server is not yet ready.
44
        walletUnlocked
45

46
        // rpcActive means that the RPC server is ready to accept calls.
47
        rpcActive
48

49
        // serverActive means that the lnd server is ready to accept calls.
50
        serverActive
51
)
52

53
var (
54
        // ErrWaitingToStart is returned if LND is still waiting to start,
55
        // possibly blocked until elected as the leader.
56
        ErrWaitingToStart = fmt.Errorf("waiting to start, RPC services not " +
57
                "available")
58

59
        // ErrNoWallet is returned if the wallet does not exist.
60
        ErrNoWallet = fmt.Errorf("wallet not created, create one to enable " +
61
                "full RPC access")
62

63
        // ErrWalletLocked is returned if the wallet is locked and any service
64
        // other than the WalletUnlocker is called.
65
        ErrWalletLocked = fmt.Errorf("wallet locked, unlock it to enable " +
66
                "full RPC access")
67

68
        // ErrWalletUnlocked is returned if the WalletUnlocker service is
69
        // called when the wallet already has been unlocked.
70
        ErrWalletUnlocked = fmt.Errorf("wallet already unlocked, " +
71
                "WalletUnlocker service is no longer available")
72

73
        // ErrRPCStarting is returned if the wallet has been unlocked but the
74
        // RPC server is not yet ready to accept calls.
75
        ErrRPCStarting = fmt.Errorf("the RPC server is in the process of " +
76
                "starting up, but not yet ready to accept calls")
77

78
        // macaroonWhitelist defines methods that we don't require macaroons to
79
        // access. We also allow these methods to be called even if not all
80
        // mandatory middlewares are registered yet. If the wallet is locked
81
        // then a middleware cannot register itself, creating an impossible
82
        // situation. Also, a middleware might want to check the state of lnd
83
        // by calling the State service before it registers itself. So we also
84
        // need to exclude those calls from the mandatory middleware check.
85
        macaroonWhitelist = map[string]struct{}{
86
                // We allow all calls to the WalletUnlocker without macaroons.
87
                "/lnrpc.WalletUnlocker/GenSeed":        {},
88
                "/lnrpc.WalletUnlocker/InitWallet":     {},
89
                "/lnrpc.WalletUnlocker/UnlockWallet":   {},
90
                "/lnrpc.WalletUnlocker/ChangePassword": {},
91

92
                // The State service must be available at all times, even
93
                // before we can check macaroons, so we whitelist it.
94
                "/lnrpc.State/SubscribeState": {},
95
                "/lnrpc.State/GetState":       {},
96
        }
97
)
98

99
// InterceptorChain is a struct that can be added to the running GRPC server,
100
// intercepting API calls. This is useful for logging, enforcing permissions,
101
// supporting middleware etc. The following diagram shows the order of each
102
// interceptor in the chain and when exactly requests/responses are intercepted
103
// and forwarded to external middleware for approval/modification. Middleware in
104
// general can only intercept gRPC requests/responses that are sent by the
105
// client with a macaroon that contains a custom caveat that is supported by one
106
// of the registered middlewares.
107
//
108
//            |
109
//            | gRPC request from client
110
//            |
111
//        +---v--------------------------------+
112
//        |   InterceptorChain                 |
113
//        +-+----------------------------------+
114
//          | Log Interceptor                  |
115
//          +----------------------------------+
116
//          | RPC State Interceptor            |
117
//          +----------------------------------+
118
//          | Macaroon Interceptor             |
119
//          +----------------------------------+--------> +---------------------+
120
//          | RPC Macaroon Middleware Handler  |<-------- | External Middleware |
121
//          +----------------------------------+          |   - modify request |
122
//          | Prometheus Interceptor           |          +---------------------+
123
//          +-+--------------------------------+
124
//            | validated gRPC request from client
125
//        +---v--------------------------------+
126
//        |   main gRPC server                 |
127
//        +---+--------------------------------+
128
//            |
129
//            | original gRPC request to client
130
//            |
131
//        +---v--------------------------------+--------> +---------------------+
132
//        |   RPC Macaroon Middleware Handler  |<-------- | External Middleware |
133
//        +---+--------------------------------+          |   - modify response |
134
//            |                                           +---------------------+
135
//            | edited gRPC request to client
136
//            v
137
type InterceptorChain struct {
138
        // lastRequestID is the ID of the last gRPC request or stream that was
139
        // intercepted by the middleware interceptor.
140
        //
141
        // NOTE: Must be used atomically!
142
        lastRequestID uint64
143

144
        // Required by the grpc-gateway/v2 library for forward compatibility.
145
        lnrpc.UnimplementedStateServer
146

147
        started sync.Once
148
        stopped sync.Once
149

150
        // state is the current RPC state of our RPC server.
151
        state rpcState
152

153
        // ntfnServer is a subscription server we use to notify clients of the
154
        // State service when the state changes.
155
        ntfnServer *subscribe.Server
156

157
        // noMacaroons should be set true if we don't want to check macaroons.
158
        noMacaroons bool
159

160
        // svc is the macaroon service used to enforce permissions in case
161
        // macaroons are used.
162
        svc *macaroons.Service
163

164
        // permissionMap is the permissions to enforce if macaroons are used.
165
        permissionMap map[string][]bakery.Op
166

167
        // rpcsLog is the logger used to log calls to the RPCs intercepted.
168
        rpcsLog btclog.Logger
169

170
        // registeredMiddleware is a slice of all macaroon permission based RPC
171
        // middleware clients that are currently registered. The
172
        // registeredMiddlewareNames can be used to find the index of a specific
173
        // interceptor within the registeredMiddleware slide using the name of
174
        // the interceptor as the key. The reason for using these two separate
175
        // structures is so that the order in which interceptors are run is
176
        // the same as the order in which they were registered.
177
        registeredMiddleware []*MiddlewareHandler
178

179
        // registeredMiddlewareNames is a map of registered middleware names
180
        // to the index at which they are stored in the registeredMiddleware
181
        // map.
182
        registeredMiddlewareNames map[string]int
183

184
        // mandatoryMiddleware is a list of all middleware that is considered to
185
        // be mandatory. If any of them is not registered then all RPC requests
186
        // (except for the macaroon white listed methods and the middleware
187
        // registration itself) are blocked. This is a security feature to make
188
        // sure that requests can't just go through unobserved/unaudited if a
189
        // middleware crashes.
190
        mandatoryMiddleware []string
191

192
        quit chan struct{}
193
        sync.RWMutex
194
}
195

196
// A compile time check to ensure that InterceptorChain fully implements the
197
// StateServer gRPC service.
198
var _ lnrpc.StateServer = (*InterceptorChain)(nil)
199

200
// NewInterceptorChain creates a new InterceptorChain.
201
func NewInterceptorChain(log btclog.Logger, noMacaroons bool,
202
        mandatoryMiddleware []string) *InterceptorChain {
×
203

×
204
        return &InterceptorChain{
×
205
                state:                     waitingToStart,
×
206
                ntfnServer:                subscribe.NewServer(),
×
207
                noMacaroons:               noMacaroons,
×
208
                permissionMap:             make(map[string][]bakery.Op),
×
209
                rpcsLog:                   log,
×
210
                registeredMiddlewareNames: make(map[string]int),
×
211
                mandatoryMiddleware:       mandatoryMiddleware,
×
212
                quit:                      make(chan struct{}),
×
213
        }
×
214
}
×
215

216
// Start starts the InterceptorChain, which is needed to start the state
217
// subscription server it powers.
218
func (r *InterceptorChain) Start() error {
×
219
        var err error
×
220
        r.started.Do(func() {
×
221
                err = r.ntfnServer.Start()
×
222
        })
×
223

224
        return err
×
225
}
226

227
// Stop stops the InterceptorChain and its internal state subscription server.
228
func (r *InterceptorChain) Stop() error {
×
229
        var err error
×
230
        r.stopped.Do(func() {
×
231
                close(r.quit)
×
232
                err = r.ntfnServer.Stop()
×
233
        })
×
234

235
        return err
×
236
}
237

238
// SetWalletNotCreated moves the RPC state from either waitingToStart to
239
// walletNotCreated.
240
func (r *InterceptorChain) SetWalletNotCreated() {
×
241
        r.Lock()
×
242
        defer r.Unlock()
×
243

×
244
        r.state = walletNotCreated
×
245
        _ = r.ntfnServer.SendUpdate(r.state)
×
246
}
×
247

248
// SetWalletLocked moves the RPC state from either walletNotCreated to
249
// walletLocked.
250
func (r *InterceptorChain) SetWalletLocked() {
×
251
        r.Lock()
×
252
        defer r.Unlock()
×
253

×
254
        r.state = walletLocked
×
255
        _ = r.ntfnServer.SendUpdate(r.state)
×
256
}
×
257

258
// SetWalletUnlocked moves the RPC state from either walletNotCreated or
259
// walletLocked to walletUnlocked.
260
func (r *InterceptorChain) SetWalletUnlocked() {
×
261
        r.Lock()
×
262
        defer r.Unlock()
×
263

×
264
        r.state = walletUnlocked
×
265
        _ = r.ntfnServer.SendUpdate(r.state)
×
266
}
×
267

268
// SetRPCActive moves the RPC state from walletUnlocked to rpcActive.
269
func (r *InterceptorChain) SetRPCActive() {
×
270
        r.Lock()
×
271
        defer r.Unlock()
×
272

×
273
        r.state = rpcActive
×
274
        _ = r.ntfnServer.SendUpdate(r.state)
×
275
}
×
276

277
// SetServerActive moves the RPC state from walletUnlocked to rpcActive.
278
func (r *InterceptorChain) SetServerActive() {
×
279
        r.Lock()
×
280
        defer r.Unlock()
×
281

×
282
        r.state = serverActive
×
283
        _ = r.ntfnServer.SendUpdate(r.state)
×
284
}
×
285

286
// rpcStateToWalletState converts rpcState to lnrpc.WalletState. Returns
287
// WAITING_TO_START and an error on conversion error.
288
func rpcStateToWalletState(state rpcState) (lnrpc.WalletState, error) {
×
289
        const defaultState = lnrpc.WalletState_WAITING_TO_START
×
290
        var walletState lnrpc.WalletState
×
291

×
292
        switch state {
×
293
        case waitingToStart:
×
294
                walletState = lnrpc.WalletState_WAITING_TO_START
×
295
        case walletNotCreated:
×
296
                walletState = lnrpc.WalletState_NON_EXISTING
×
297
        case walletLocked:
×
298
                walletState = lnrpc.WalletState_LOCKED
×
299
        case walletUnlocked:
×
300
                walletState = lnrpc.WalletState_UNLOCKED
×
301
        case rpcActive:
×
302
                walletState = lnrpc.WalletState_RPC_ACTIVE
×
303
        case serverActive:
×
304
                walletState = lnrpc.WalletState_SERVER_ACTIVE
×
305

306
        default:
×
307
                return defaultState, fmt.Errorf("unknown wallet state %v", state)
×
308
        }
309

310
        return walletState, nil
×
311
}
312

313
// SubscribeState subscribes to the state of the wallet. The current wallet
314
// state will always be delivered immediately.
315
//
316
// NOTE: Part of the StateService interface.
317
func (r *InterceptorChain) SubscribeState(_ *lnrpc.SubscribeStateRequest,
318
        stream lnrpc.State_SubscribeStateServer) error {
×
319

×
320
        sendStateUpdate := func(state rpcState) error {
×
321
                walletState, err := rpcStateToWalletState(state)
×
322
                if err != nil {
×
323
                        return err
×
324
                }
×
325

326
                return stream.Send(&lnrpc.SubscribeStateResponse{
×
327
                        State: walletState,
×
328
                })
×
329
        }
330

331
        // Subscribe to state updates.
332
        client, err := r.ntfnServer.Subscribe()
×
333
        if err != nil {
×
334
                return err
×
335
        }
×
336
        defer client.Cancel()
×
337

×
338
        // Always start by sending the current state.
×
339
        r.RLock()
×
340
        state := r.state
×
341
        r.RUnlock()
×
342

×
343
        if err := sendStateUpdate(state); err != nil {
×
344
                return err
×
345
        }
×
346

347
        for {
×
348
                select {
×
349
                case e := <-client.Updates():
×
350
                        newState := e.(rpcState)
×
351

×
352
                        // Ignore already sent state.
×
353
                        if newState == state {
×
354
                                continue
×
355
                        }
356

357
                        state = newState
×
358
                        err := sendStateUpdate(state)
×
359
                        if err != nil {
×
360
                                return err
×
361
                        }
×
362

363
                // The response stream's context for whatever reason has been
364
                // closed. If context is closed by an exceeded deadline we will
365
                // return an error.
366
                case <-stream.Context().Done():
×
367
                        if errors.Is(stream.Context().Err(), context.Canceled) {
×
368
                                return nil
×
369
                        }
×
370
                        return stream.Context().Err()
×
371

372
                case <-r.quit:
×
373
                        return fmt.Errorf("server exiting")
×
374
                }
375
        }
376
}
377

378
// GetState returns the current wallet state.
379
func (r *InterceptorChain) GetState(_ context.Context,
380
        _ *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) {
×
381

×
382
        r.RLock()
×
383
        state := r.state
×
384
        r.RUnlock()
×
385

×
386
        walletState, err := rpcStateToWalletState(state)
×
387
        if err != nil {
×
388
                return nil, err
×
389
        }
×
390

391
        return &lnrpc.GetStateResponse{
×
392
                State: walletState,
×
393
        }, nil
×
394
}
395

396
// AddMacaroonService adds a macaroon service to the interceptor. After this is
397
// done every RPC call made will have to pass a valid macaroon to be accepted.
398
func (r *InterceptorChain) AddMacaroonService(svc *macaroons.Service) {
×
399
        r.Lock()
×
400
        defer r.Unlock()
×
401

×
402
        r.svc = svc
×
403
}
×
404

405
// MacaroonService returns the currently registered macaroon service. This might
406
// be nil if none was registered (yet).
407
func (r *InterceptorChain) MacaroonService() *macaroons.Service {
×
408
        r.RLock()
×
409
        defer r.RUnlock()
×
410

×
411
        return r.svc
×
412
}
×
413

414
// AddPermission adds a new macaroon rule for the given method.
415
func (r *InterceptorChain) AddPermission(method string, ops []bakery.Op) error {
×
416
        r.Lock()
×
417
        defer r.Unlock()
×
418

×
419
        if _, ok := r.permissionMap[method]; ok {
×
420
                return fmt.Errorf("detected duplicate macaroon constraints "+
×
421
                        "for path: %v", method)
×
422
        }
×
423

424
        r.permissionMap[method] = ops
×
425
        return nil
×
426
}
427

428
// Permissions returns the current set of macaroon permissions.
429
func (r *InterceptorChain) Permissions() map[string][]bakery.Op {
×
430
        r.RLock()
×
431
        defer r.RUnlock()
×
432

×
433
        // Make a copy under the read lock to avoid races.
×
434
        c := make(map[string][]bakery.Op)
×
435
        for k, v := range r.permissionMap {
×
436
                s := make([]bakery.Op, len(v))
×
437
                copy(s, v)
×
438
                c[k] = s
×
439
        }
×
440

441
        return c
×
442
}
443

444
// RegisterMiddleware registers a new middleware that will handle request/
445
// response interception for all RPC messages that are initiated with a custom
446
// macaroon caveat. The name of the custom caveat a middleware is handling is
447
// also its unique identifier. Only one middleware can be registered for each
448
// custom caveat.
449
func (r *InterceptorChain) RegisterMiddleware(mw *MiddlewareHandler) error {
×
450
        r.Lock()
×
451
        defer r.Unlock()
×
452

×
453
        // The name of the middleware is the unique identifier.
×
454
        _, ok := r.registeredMiddlewareNames[mw.middlewareName]
×
455
        if ok {
×
456
                return fmt.Errorf("a middleware with the name '%s' is already "+
×
457
                        "registered", mw.middlewareName)
×
458
        }
×
459

460
        // For now, we only want one middleware per custom caveat name. If we
461
        // allowed multiple middlewares handling the same caveat there would be
462
        // a need for extra call chaining logic, and they could overwrite each
463
        // other's responses.
464
        for _, middleware := range r.registeredMiddleware {
×
465
                if middleware.customCaveatName == mw.customCaveatName {
×
466
                        return fmt.Errorf("a middleware is already registered "+
×
467
                                "for the custom caveat name '%s': %v",
×
468
                                mw.customCaveatName, middleware.middlewareName)
×
469
                }
×
470
        }
471

472
        r.registeredMiddleware = append(r.registeredMiddleware, mw)
×
473
        index := len(r.registeredMiddleware) - 1
×
474
        r.registeredMiddlewareNames[mw.middlewareName] = index
×
475

×
476
        return nil
×
477
}
478

479
// RemoveMiddleware removes the middleware that handles the given custom caveat
480
// name.
481
func (r *InterceptorChain) RemoveMiddleware(middlewareName string) {
×
482
        r.Lock()
×
483
        defer r.Unlock()
×
484

×
485
        log.Debugf("Removing middleware %s", middlewareName)
×
486

×
487
        index, ok := r.registeredMiddlewareNames[middlewareName]
×
488
        if !ok {
×
489
                return
×
490
        }
×
491
        delete(r.registeredMiddlewareNames, middlewareName)
×
492

×
493
        r.registeredMiddleware = append(
×
494
                r.registeredMiddleware[:index],
×
495
                r.registeredMiddleware[index+1:]...,
×
496
        )
×
497

×
498
        // Re-initialise the middleware look-up map with the updated indexes.
×
499
        r.registeredMiddlewareNames = make(map[string]int)
×
500
        for i, mw := range r.registeredMiddleware {
×
501
                r.registeredMiddlewareNames[mw.middlewareName] = i
×
502
        }
×
503
}
504

505
// CustomCaveatSupported makes sure a middleware that handles the given custom
506
// caveat name is registered. If none is, an error is returned, signalling to
507
// the macaroon bakery and its validator to reject macaroons that have a custom
508
// caveat with that name.
509
//
510
// NOTE: This method is part of the macaroons.CustomCaveatAcceptor interface.
511
func (r *InterceptorChain) CustomCaveatSupported(customCaveatName string) error {
×
512
        r.RLock()
×
513
        defer r.RUnlock()
×
514

×
515
        // We only accept requests with a custom caveat if we also have a
×
516
        // middleware registered that handles that custom caveat. That is
×
517
        // crucial for security! Otherwise a request with an encumbered (=has
×
518
        // restricted permissions based upon the custom caveat condition)
×
519
        // macaroon would not be validated against the limitations that the
×
520
        // custom caveat implicate. Since the map is keyed by the _name_ of the
×
521
        // middleware, we need to loop through all of them to see if one has
×
522
        // the given custom macaroon caveat name.
×
523
        for _, middleware := range r.registeredMiddleware {
×
524
                if middleware.customCaveatName == customCaveatName {
×
525
                        return nil
×
526
                }
×
527
        }
528

529
        return fmt.Errorf("cannot accept macaroon with custom caveat '%s', "+
×
530
                "no middleware registered to handle it", customCaveatName)
×
531
}
532

533
// CreateServerOpts creates the GRPC server options that can be added to a GRPC
534
// server in order to add this InterceptorChain.
535
func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption {
×
536
        var unaryInterceptors []grpc.UnaryServerInterceptor
×
537
        var strmInterceptors []grpc.StreamServerInterceptor
×
538

×
539
        // The first interceptors we'll add to the chain is our logging
×
540
        // interceptors, so we can automatically log all errors that happen
×
541
        // during RPC calls.
×
542
        unaryInterceptors = append(
×
543
                unaryInterceptors, errorLogUnaryServerInterceptor(r.rpcsLog),
×
544
        )
×
545
        strmInterceptors = append(
×
546
                strmInterceptors, errorLogStreamServerInterceptor(r.rpcsLog),
×
547
        )
×
548

×
549
        // Next we'll add our RPC state check interceptors, that will check
×
550
        // whether the attempted call is allowed in the current state.
×
551
        unaryInterceptors = append(
×
552
                unaryInterceptors, r.rpcStateUnaryServerInterceptor(),
×
553
        )
×
554
        strmInterceptors = append(
×
555
                strmInterceptors, r.rpcStateStreamServerInterceptor(),
×
556
        )
×
557

×
558
        // We'll add the macaroon interceptors. If macaroons aren't disabled,
×
559
        // then these interceptors will enforce macaroon authentication.
×
560
        unaryInterceptors = append(
×
561
                unaryInterceptors, r.MacaroonUnaryServerInterceptor(),
×
562
        )
×
563
        strmInterceptors = append(
×
564
                strmInterceptors, r.MacaroonStreamServerInterceptor(),
×
565
        )
×
566

×
567
        // Next, we'll add the interceptors for our custom macaroon caveat based
×
568
        // middleware.
×
569
        unaryInterceptors = append(
×
570
                unaryInterceptors, r.middlewareUnaryServerInterceptor(),
×
571
        )
×
572
        strmInterceptors = append(
×
573
                strmInterceptors, r.middlewareStreamServerInterceptor(),
×
574
        )
×
575

×
576
        // Get interceptors for Prometheus to gather gRPC performance metrics.
×
577
        // If monitoring is disabled, GetPromInterceptors() will return empty
×
578
        // slices.
×
579
        promUnaryInterceptors, promStrmInterceptors :=
×
580
                monitoring.GetPromInterceptors()
×
581

×
582
        // Concatenate the slices of unary and stream interceptors respectively.
×
583
        unaryInterceptors = append(unaryInterceptors, promUnaryInterceptors...)
×
584
        strmInterceptors = append(strmInterceptors, promStrmInterceptors...)
×
585

×
586
        // Create server options from the interceptors we just set up.
×
587
        chainedUnary := grpc_middleware.WithUnaryServerChain(
×
588
                unaryInterceptors...,
×
589
        )
×
590
        chainedStream := grpc_middleware.WithStreamServerChain(
×
591
                strmInterceptors...,
×
592
        )
×
593
        serverOpts := []grpc.ServerOption{chainedUnary, chainedStream}
×
594

×
595
        return serverOpts
×
596
}
×
597

598
// errorLogUnaryServerInterceptor is a simple UnaryServerInterceptor that will
599
// automatically log any errors that occur when serving a client's unary
600
// request.
601
func errorLogUnaryServerInterceptor(logger btclog.Logger) grpc.UnaryServerInterceptor {
×
602
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
×
603
                handler grpc.UnaryHandler) (interface{}, error) {
×
604

×
605
                resp, err := handler(ctx, req)
×
606
                if err != nil {
×
607
                        // TODO(roasbeef): also log request details?
×
608
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
×
609
                }
×
610

611
                return resp, err
×
612
        }
613
}
614

615
// errorLogStreamServerInterceptor is a simple StreamServerInterceptor that
616
// will log any errors that occur while processing a client or server streaming
617
// RPC.
618
func errorLogStreamServerInterceptor(logger btclog.Logger) grpc.StreamServerInterceptor {
×
619
        return func(srv interface{}, ss grpc.ServerStream,
×
620
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
621

×
622
                err := handler(srv, ss)
×
623
                if err != nil {
×
624
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
×
625
                }
×
626

627
                return err
×
628
        }
629
}
630

631
// checkMacaroon validates that the context contains the macaroon needed to
632
// invoke the given RPC method.
633
func (r *InterceptorChain) checkMacaroon(ctx context.Context,
634
        fullMethod string) error {
×
635

×
636
        // If noMacaroons is set, we'll always allow the call.
×
637
        if r.noMacaroons {
×
638
                return nil
×
639
        }
×
640

641
        // Check whether the method is whitelisted, if so we'll allow it
642
        // regardless of macaroons.
643
        _, ok := macaroonWhitelist[fullMethod]
×
644
        if ok {
×
645
                return nil
×
646
        }
×
647

648
        r.RLock()
×
649
        svc := r.svc
×
650
        r.RUnlock()
×
651

×
652
        // If the macaroon service is not yet active, we cannot allow
×
653
        // the call.
×
654
        if svc == nil {
×
655
                return fmt.Errorf("unable to determine macaroon permissions")
×
656
        }
×
657

658
        r.RLock()
×
659
        uriPermissions, ok := r.permissionMap[fullMethod]
×
660
        r.RUnlock()
×
661
        if !ok {
×
662
                return fmt.Errorf("%s: unknown permissions required for method",
×
663
                        fullMethod)
×
664
        }
×
665

666
        // Find out if there is an external validator registered for
667
        // this method. Fall back to the internal one if there isn't.
668
        validator, ok := svc.ExternalValidators[fullMethod]
×
669
        if !ok {
×
670
                validator = svc
×
671
        }
×
672

673
        // Now that we know what validator to use, let it do its work.
674
        return validator.ValidateMacaroon(ctx, uriPermissions, fullMethod)
×
675
}
676

677
// MacaroonUnaryServerInterceptor is a GRPC interceptor that checks whether the
678
// request is authorized by the included macaroons.
679
func (r *InterceptorChain) MacaroonUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
680
        return func(ctx context.Context, req interface{},
×
681
                info *grpc.UnaryServerInfo,
×
682
                handler grpc.UnaryHandler) (interface{}, error) {
×
683

×
684
                // Check macaroons.
×
685
                if err := r.checkMacaroon(ctx, info.FullMethod); err != nil {
×
686
                        return nil, err
×
687
                }
×
688

689
                return handler(ctx, req)
×
690
        }
691
}
692

693
// MacaroonStreamServerInterceptor is a GRPC interceptor that checks whether
694
// the request is authorized by the included macaroons.
695
func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerInterceptor {
×
696
        return func(srv interface{}, ss grpc.ServerStream,
×
697
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
698

×
699
                // Check macaroons.
×
700
                err := r.checkMacaroon(ss.Context(), info.FullMethod)
×
701
                if err != nil {
×
702
                        return err
×
703
                }
×
704

705
                return handler(srv, ss)
×
706
        }
707
}
708

709
// checkRPCState checks whether a call to the given server is allowed in the
710
// current RPC state.
711
func (r *InterceptorChain) checkRPCState(srv interface{}) error {
×
712
        // The StateService is being accessed, we allow the call regardless of
×
713
        // the current state.
×
714
        _, ok := srv.(lnrpc.StateServer)
×
715
        if ok {
×
716
                return nil
×
717
        }
×
718

719
        r.RLock()
×
720
        state := r.state
×
721
        r.RUnlock()
×
722

×
723
        switch state {
×
724
        // Do not accept any RPC calls (unless to the state service) until LND
725
        // has not started.
726
        case waitingToStart:
×
727
                return ErrWaitingToStart
×
728

729
        // If the wallet does not exists, only calls to the WalletUnlocker are
730
        // accepted.
731
        case walletNotCreated:
×
732
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
733
                if !ok {
×
734
                        return ErrNoWallet
×
735
                }
×
736

737
        // If the wallet is locked, only calls to the WalletUnlocker are
738
        // accepted.
739
        case walletLocked:
×
740
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
741
                if !ok {
×
742
                        return ErrWalletLocked
×
743
                }
×
744

745
        // If the wallet is unlocked, but the RPC not yet active, we reject.
746
        case walletUnlocked:
×
747
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
748
                if ok {
×
749
                        return ErrWalletUnlocked
×
750
                }
×
751

752
                return ErrRPCStarting
×
753

754
        // If the RPC server or lnd server is active, we allow calls to any
755
        // service except the WalletUnlocker.
756
        case rpcActive, serverActive:
×
757
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
758
                if ok {
×
759
                        return ErrWalletUnlocked
×
760
                }
×
761

762
        default:
×
763
                return fmt.Errorf("unknown RPC state: %v", state)
×
764
        }
765

766
        return nil
×
767
}
768

769
// rpcStateUnaryServerInterceptor is a GRPC interceptor that checks whether
770
// calls to the given gGRPC server is allowed in the current rpc state.
771
func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
772
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
×
773
                handler grpc.UnaryHandler) (interface{}, error) {
×
774

×
775
                r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
×
776

×
777
                if err := r.checkRPCState(info.Server); err != nil {
×
778
                        return nil, err
×
779
                }
×
780

781
                return handler(ctx, req)
×
782
        }
783
}
784

785
// rpcStateStreamServerInterceptor is a GRPC interceptor that checks whether
786
// calls to the given gGRPC server is allowed in the current rpc state.
787
func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerInterceptor {
×
788
        return func(srv interface{}, ss grpc.ServerStream,
×
789
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
790

×
791
                r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
×
792

×
793
                if err := r.checkRPCState(srv); err != nil {
×
794
                        return err
×
795
                }
×
796

797
                return handler(srv, ss)
×
798
        }
799
}
800

801
// middlewareUnaryServerInterceptor is a unary gRPC interceptor that intercepts
802
// all requests and responses that are sent with a macaroon containing a custom
803
// caveat condition that is handled by registered middleware.
804
func (r *InterceptorChain) middlewareUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
805
        return func(ctx context.Context,
×
806
                req interface{}, info *grpc.UnaryServerInfo,
×
807
                handler grpc.UnaryHandler) (interface{}, error) {
×
808

×
809
                // Make sure we don't allow any requests through if one of the
×
810
                // mandatory middlewares is missing.
×
811
                fullMethod := info.FullMethod
×
812
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
×
813
                        return nil, err
×
814
                }
×
815

816
                // If there is no middleware registered, we don't need to
817
                // intercept anything.
818
                if !r.middlewareRegistered() {
×
819
                        return handler(ctx, req)
×
820
                }
×
821

822
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
×
823
                req, err := r.interceptMessage(
×
824
                        ctx, TypeRequest, requestID, false, info.FullMethod,
×
825
                        req,
×
826
                )
×
827
                if err != nil {
×
828
                        return nil, err
×
829
                }
×
830

831
                // Call the handler, which executes the request against lnd.
832
                lndResp, lndErr := handler(ctx, req)
×
833
                if lndErr != nil {
×
834
                        // The call to lnd ended in an error and not a normal
×
835
                        // proto message response. Send the error to the
×
836
                        // interceptor as well to inform about the abnormal
×
837
                        // termination of the stream and to give the option to
×
838
                        // replace the error message with a custom one.
×
839
                        replacedErr, err := r.interceptMessage(
×
840
                                ctx, TypeResponse, requestID, false,
×
841
                                info.FullMethod, lndErr,
×
842
                        )
×
843
                        if err != nil {
×
844
                                return nil, err
×
845
                        }
×
846
                        return lndResp, replacedErr.(error)
×
847
                }
848

849
                return r.interceptMessage(
×
850
                        ctx, TypeResponse, requestID, false, info.FullMethod,
×
851
                        lndResp,
×
852
                )
×
853
        }
854
}
855

856
// middlewareStreamServerInterceptor is a streaming gRPC interceptor that
857
// intercepts all requests and responses that are sent with a macaroon
858
// containing a custom caveat condition that is handled by registered
859
// middleware.
860
func (r *InterceptorChain) middlewareStreamServerInterceptor() grpc.StreamServerInterceptor {
×
861
        return func(srv interface{},
×
862
                ss grpc.ServerStream, info *grpc.StreamServerInfo,
×
863
                handler grpc.StreamHandler) error {
×
864

×
865
                // Don't intercept the interceptor itself which is a streaming
×
866
                // RPC too!
×
867
                fullMethod := info.FullMethod
×
868
                if fullMethod == lnrpc.RegisterRPCMiddlewareURI {
×
869
                        return handler(srv, ss)
×
870
                }
×
871

872
                // Make sure we don't allow any requests through if one of the
873
                // mandatory middlewares is missing. We add this check here to
874
                // make sure the middleware registration itself can still be
875
                // called.
876
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
×
877
                        return err
×
878
                }
×
879

880
                // If there is no middleware registered, we don't need to
881
                // intercept anything.
882
                if !r.middlewareRegistered() {
×
883
                        return handler(srv, ss)
×
884
                }
×
885

886
                // To give the middleware a chance to accept or reject the
887
                // establishment of the stream itself (and not only when the
888
                // first message is sent on the stream), we send an intercept
889
                // request for the stream auth now:
890
                msg, err := NewStreamAuthInterceptionRequest(
×
891
                        ss.Context(), info.FullMethod,
×
892
                )
×
893
                if err != nil {
×
894
                        return err
×
895
                }
×
896

897
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
×
898
                err = r.acceptStream(requestID, msg)
×
899
                if err != nil {
×
900
                        return err
×
901
                }
×
902

903
                wrappedSS := &serverStreamWrapper{
×
904
                        ServerStream: ss,
×
905
                        requestID:    requestID,
×
906
                        fullMethod:   info.FullMethod,
×
907
                        interceptor:  r,
×
908
                }
×
909

×
910
                // Call the stream handler, which will block as long as the
×
911
                // stream is alive.
×
912
                lndErr := handler(srv, wrappedSS)
×
913
                if lndErr != nil {
×
914
                        // This is an error being returned from lnd. Send it to
×
915
                        // the interceptor as well to inform about the abnormal
×
916
                        // termination of the stream and to give the option to
×
917
                        // replace the error message with a custom one.
×
918
                        replacedErr, err := r.interceptMessage(
×
919
                                ss.Context(), TypeResponse, requestID,
×
920
                                true, info.FullMethod, lndErr,
×
921
                        )
×
922
                        if err != nil {
×
923
                                return err
×
924
                        }
×
925

926
                        return replacedErr.(error)
×
927
                }
928

929
                // Normal/successful termination of the stream.
930
                return nil
×
931
        }
932
}
933

934
// checkMandatoryMiddleware makes sure that each of the middlewares declared as
935
// mandatory is currently registered.
936
func (r *InterceptorChain) checkMandatoryMiddleware(fullMethod string) error {
×
937
        r.RLock()
×
938
        defer r.RUnlock()
×
939

×
940
        // Allow calls that are whitelisted for macaroons as well, otherwise we
×
941
        // get into impossible situations where the wallet is locked but the
×
942
        // unlock call is denied because the middleware isn't registered. But
×
943
        // the middleware cannot register itself because the wallet is locked.
×
944
        if _, ok := macaroonWhitelist[fullMethod]; ok {
×
945
                return nil
×
946
        }
×
947

948
        // Not a white listed call so make sure every mandatory middleware is
949
        // currently connected to lnd.
950
        for _, name := range r.mandatoryMiddleware {
×
951
                if _, ok := r.registeredMiddlewareNames[name]; !ok {
×
952
                        return fmt.Errorf("mandatory middleware '%s' is "+
×
953
                                "currently not registered, not allowing any "+
×
954
                                "RPC calls", name)
×
955
                }
×
956
        }
957

958
        return nil
×
959
}
960

961
// middlewareRegistered returns true if there is at least one middleware
962
// currently registered.
963
func (r *InterceptorChain) middlewareRegistered() bool {
×
964
        r.RLock()
×
965
        defer r.RUnlock()
×
966

×
967
        return len(r.registeredMiddleware) > 0
×
968
}
×
969

970
// acceptStream sends an intercept request to all middlewares that have
971
// registered for it. This means either a middleware has requested read-only
972
// access or the request actually has a macaroon with a caveat the middleware
973
// registered for.
974
func (r *InterceptorChain) acceptStream(requestID uint64,
975
        msg *InterceptionRequest) error {
×
976

×
977
        r.RLock()
×
978
        defer r.RUnlock()
×
979

×
980
        for _, middleware := range r.registeredMiddleware {
×
981
                // If there is a custom caveat in the macaroon, make sure the
×
982
                // middleware registered for it. Or if a middleware registered
×
983
                // for read-only mode, it also gets the request.
×
984
                hasCustomCaveat := macaroons.HasCustomCaveat(
×
985
                        msg.Macaroon, middleware.customCaveatName,
×
986
                )
×
987
                if !hasCustomCaveat && !middleware.readOnly {
×
988
                        continue
×
989
                }
990

991
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
×
992
                        msg.Macaroon, middleware.customCaveatName,
×
993
                )
×
994

×
995
                resp, err := middleware.intercept(requestID, msg)
×
996

×
997
                // Error during interception itself.
×
998
                if err != nil {
×
999
                        return err
×
1000
                }
×
1001

1002
                // Error returned from middleware client.
1003
                if resp.err != nil {
×
1004
                        return resp.err
×
1005
                }
×
1006
        }
1007

1008
        return nil
×
1009
}
1010

1011
// interceptMessage sends out an intercept request for an RPC response. Since
1012
// middleware that hasn't registered for the read-only mode has the option to
1013
// overwrite/replace the message, this needs to be handled differently than the
1014
// auth path above.
1015
func (r *InterceptorChain) interceptMessage(ctx context.Context,
1016
        interceptType InterceptType, requestID uint64, isStream bool,
1017
        fullMethod string, m interface{}) (interface{}, error) {
×
1018

×
1019
        r.RLock()
×
1020
        defer r.RUnlock()
×
1021

×
1022
        currentMessage := m
×
1023
        for _, middleware := range r.registeredMiddleware {
×
1024
                msg, err := NewMessageInterceptionRequest(
×
1025
                        ctx, interceptType, isStream, fullMethod,
×
1026
                        currentMessage,
×
1027
                )
×
1028
                if err != nil {
×
1029
                        return nil, err
×
1030
                }
×
1031

1032
                // If there is a custom caveat in the macaroon, make sure the
1033
                // middleware registered for it. Or if a middleware registered
1034
                // for read-only mode, it also gets the request.
1035
                hasCustomCaveat := macaroons.HasCustomCaveat(
×
1036
                        msg.Macaroon, middleware.customCaveatName,
×
1037
                )
×
1038
                if !hasCustomCaveat && !middleware.readOnly {
×
1039
                        continue
×
1040
                }
1041

1042
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
×
1043
                        msg.Macaroon, middleware.customCaveatName,
×
1044
                )
×
1045

×
1046
                resp, err := middleware.intercept(requestID, msg)
×
1047

×
1048
                // Error during interception itself.
×
1049
                if err != nil {
×
1050
                        return nil, err
×
1051
                }
×
1052

1053
                // Error returned from middleware client.
1054
                if resp.err != nil {
×
1055
                        return nil, resp.err
×
1056
                }
×
1057

1058
                // The message was replaced, make sure the next middleware in
1059
                // line receives the updated message.
1060
                if !middleware.readOnly && resp.replace {
×
1061
                        currentMessage = resp.replacement
×
1062
                }
×
1063
        }
1064

1065
        return currentMessage, nil
×
1066
}
1067

1068
// serverStreamWrapper is a struct that wraps a server stream in a way that all
1069
// requests and responses can be intercepted individually.
1070
type serverStreamWrapper struct {
1071
        // ServerStream is the stream that's being wrapped.
1072
        grpc.ServerStream
1073

1074
        requestID uint64
1075

1076
        fullMethod string
1077

1078
        interceptor *InterceptorChain
1079
}
1080

1081
// SendMsg is called when lnd sends a message to the client. This is wrapped to
1082
// intercept streaming RPC responses.
1083
func (w *serverStreamWrapper) SendMsg(m interface{}) error {
×
1084
        newMsg, err := w.interceptor.interceptMessage(
×
1085
                w.ServerStream.Context(), TypeResponse, w.requestID, true,
×
1086
                w.fullMethod, m,
×
1087
        )
×
1088
        if err != nil {
×
1089
                return err
×
1090
        }
×
1091

1092
        return w.ServerStream.SendMsg(newMsg)
×
1093
}
1094

1095
// RecvMsg is called when lnd wants to receive a message from the client. This
1096
// is wrapped to intercept streaming RPC requests.
1097
func (w *serverStreamWrapper) RecvMsg(m interface{}) error {
×
1098
        err := w.ServerStream.RecvMsg(m)
×
1099
        if err != nil {
×
1100
                return err
×
1101
        }
×
1102

1103
        req, err := w.interceptor.interceptMessage(
×
1104
                w.ServerStream.Context(), TypeRequest, w.requestID, true,
×
1105
                w.fullMethod, m,
×
1106
        )
×
1107
        if err != nil {
×
1108
                return err
×
1109
        }
×
1110

1111
        return replaceProtoMsg(m, req)
×
1112
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc