• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12375116696

17 Dec 2024 02:29PM UTC coverage: 58.366% (-0.2%) from 58.595%
12375116696

Pull #8777

github

ziggie1984
docs: add release-notes
Pull Request #8777: multi: make deletion of edge atomic.

132 of 177 new or added lines in 6 files covered. (74.58%)

670 existing lines in 37 files now uncovered.

133926 of 229458 relevant lines covered (58.37%)

19223.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.59
/rpcperms/interceptor.go
1
package rpcperms
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btclog/v2"
11
        grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
12
        "github.com/lightningnetwork/lnd/lnrpc"
13
        "github.com/lightningnetwork/lnd/macaroons"
14
        "github.com/lightningnetwork/lnd/monitoring"
15
        "github.com/lightningnetwork/lnd/subscribe"
16
        "google.golang.org/grpc"
17
        "gopkg.in/macaroon-bakery.v2/bakery"
18
)
19

20
// rpcState is an enum that we use to keep track of the current RPC service
21
// state. This will transition as we go from startup to unlocking the wallet,
22
// and finally fully active.
23
type rpcState uint8
24

25
const (
26
        // waitingToStart indicates that we're at the beginning of the startup
27
        // process. In a cluster environment this may mean that we're waiting to
28
        // become the leader in which case RPC calls will be disabled until
29
        // this instance has been elected as leader.
30
        waitingToStart rpcState = iota
31

32
        // walletNotCreated is the starting state if the RPC server is active,
33
        // but the wallet is not yet created. In this state we'll only allow
34
        // calls to the WalletUnlockerService.
35
        walletNotCreated
36

37
        // walletLocked indicates the RPC server is active, but the wallet is
38
        // locked. In this state we'll only allow calls to the
39
        // WalletUnlockerService.
40
        walletLocked
41

42
        // walletUnlocked means that the wallet has been unlocked, but the full
43
        // RPC server is not yet ready.
44
        walletUnlocked
45

46
        // rpcActive means that the RPC server is ready to accept calls.
47
        rpcActive
48

49
        // serverActive means that the lnd server is ready to accept calls.
50
        serverActive
51
)
52

53
var (
54
        // ErrWaitingToStart is returned if LND is still waiting to start,
55
        // possibly blocked until elected as the leader.
56
        ErrWaitingToStart = fmt.Errorf("waiting to start, RPC services not " +
57
                "available")
58

59
        // ErrNoWallet is returned if the wallet does not exist.
60
        ErrNoWallet = fmt.Errorf("wallet not created, create one to enable " +
61
                "full RPC access")
62

63
        // ErrWalletLocked is returned if the wallet is locked and any service
64
        // other than the WalletUnlocker is called.
65
        ErrWalletLocked = fmt.Errorf("wallet locked, unlock it to enable " +
66
                "full RPC access")
67

68
        // ErrWalletUnlocked is returned if the WalletUnlocker service is
69
        // called when the wallet already has been unlocked.
70
        ErrWalletUnlocked = fmt.Errorf("wallet already unlocked, " +
71
                "WalletUnlocker service is no longer available")
72

73
        // ErrRPCStarting is returned if the wallet has been unlocked but the
74
        // RPC server is not yet ready to accept calls.
75
        ErrRPCStarting = fmt.Errorf("the RPC server is in the process of " +
76
                "starting up, but not yet ready to accept calls")
77

78
        // macaroonWhitelist defines methods that we don't require macaroons to
79
        // access. We also allow these methods to be called even if not all
80
        // mandatory middlewares are registered yet. If the wallet is locked
81
        // then a middleware cannot register itself, creating an impossible
82
        // situation. Also, a middleware might want to check the state of lnd
83
        // by calling the State service before it registers itself. So we also
84
        // need to exclude those calls from the mandatory middleware check.
85
        macaroonWhitelist = map[string]struct{}{
86
                // We allow all calls to the WalletUnlocker without macaroons.
87
                "/lnrpc.WalletUnlocker/GenSeed":        {},
88
                "/lnrpc.WalletUnlocker/InitWallet":     {},
89
                "/lnrpc.WalletUnlocker/UnlockWallet":   {},
90
                "/lnrpc.WalletUnlocker/ChangePassword": {},
91

92
                // The State service must be available at all times, even
93
                // before we can check macaroons, so we whitelist it.
94
                "/lnrpc.State/SubscribeState": {},
95
                "/lnrpc.State/GetState":       {},
96
        }
97
)
98

99
// InterceptorChain is a struct that can be added to the running GRPC server,
100
// intercepting API calls. This is useful for logging, enforcing permissions,
101
// supporting middleware etc. The following diagram shows the order of each
102
// interceptor in the chain and when exactly requests/responses are intercepted
103
// and forwarded to external middleware for approval/modification. Middleware in
104
// general can only intercept gRPC requests/responses that are sent by the
105
// client with a macaroon that contains a custom caveat that is supported by one
106
// of the registered middlewares.
107
//
108
//            |
109
//            | gRPC request from client
110
//            |
111
//        +---v--------------------------------+
112
//        |   InterceptorChain                 |
113
//        +-+----------------------------------+
114
//          | Log Interceptor                  |
115
//          +----------------------------------+
116
//          | RPC State Interceptor            |
117
//          +----------------------------------+
118
//          | Macaroon Interceptor             |
119
//          +----------------------------------+--------> +---------------------+
120
//          | RPC Macaroon Middleware Handler  |<-------- | External Middleware |
121
//          +----------------------------------+          |   - modify request |
122
//          | Prometheus Interceptor           |          +---------------------+
123
//          +-+--------------------------------+
124
//            | validated gRPC request from client
125
//        +---v--------------------------------+
126
//        |   main gRPC server                 |
127
//        +---+--------------------------------+
128
//            |
129
//            | original gRPC request to client
130
//            |
131
//        +---v--------------------------------+--------> +---------------------+
132
//        |   RPC Macaroon Middleware Handler  |<-------- | External Middleware |
133
//        +---+--------------------------------+          |   - modify response |
134
//            |                                           +---------------------+
135
//            | edited gRPC request to client
136
//            v
137
type InterceptorChain struct {
138
        // lastRequestID is the ID of the last gRPC request or stream that was
139
        // intercepted by the middleware interceptor.
140
        //
141
        // NOTE: Must be used atomically!
142
        lastRequestID uint64
143

144
        // Required by the grpc-gateway/v2 library for forward compatibility.
145
        lnrpc.UnimplementedStateServer
146

147
        started sync.Once
148
        stopped sync.Once
149

150
        // state is the current RPC state of our RPC server.
151
        state rpcState
152

153
        // ntfnServer is a subscription server we use to notify clients of the
154
        // State service when the state changes.
155
        ntfnServer *subscribe.Server
156

157
        // noMacaroons should be set true if we don't want to check macaroons.
158
        noMacaroons bool
159

160
        // svc is the macaroon service used to enforce permissions in case
161
        // macaroons are used.
162
        svc *macaroons.Service
163

164
        // permissionMap is the permissions to enforce if macaroons are used.
165
        permissionMap map[string][]bakery.Op
166

167
        // rpcsLog is the logger used to log calls to the RPCs intercepted.
168
        rpcsLog btclog.Logger
169

170
        // registeredMiddleware is a slice of all macaroon permission based RPC
171
        // middleware clients that are currently registered. The
172
        // registeredMiddlewareNames can be used to find the index of a specific
173
        // interceptor within the registeredMiddleware slide using the name of
174
        // the interceptor as the key. The reason for using these two separate
175
        // structures is so that the order in which interceptors are run is
176
        // the same as the order in which they were registered.
177
        registeredMiddleware []*MiddlewareHandler
178

179
        // registeredMiddlewareNames is a map of registered middleware names
180
        // to the index at which they are stored in the registeredMiddleware
181
        // map.
182
        registeredMiddlewareNames map[string]int
183

184
        // mandatoryMiddleware is a list of all middleware that is considered to
185
        // be mandatory. If any of them is not registered then all RPC requests
186
        // (except for the macaroon white listed methods and the middleware
187
        // registration itself) are blocked. This is a security feature to make
188
        // sure that requests can't just go through unobserved/unaudited if a
189
        // middleware crashes.
190
        mandatoryMiddleware []string
191

192
        quit chan struct{}
193
        sync.RWMutex
194
}
195

196
// A compile time check to ensure that InterceptorChain fully implements the
197
// StateServer gRPC service.
198
var _ lnrpc.StateServer = (*InterceptorChain)(nil)
199

200
// NewInterceptorChain creates a new InterceptorChain.
201
func NewInterceptorChain(log btclog.Logger, noMacaroons bool,
202
        mandatoryMiddleware []string) *InterceptorChain {
1✔
203

1✔
204
        return &InterceptorChain{
1✔
205
                state:                     waitingToStart,
1✔
206
                ntfnServer:                subscribe.NewServer(),
1✔
207
                noMacaroons:               noMacaroons,
1✔
208
                permissionMap:             make(map[string][]bakery.Op),
1✔
209
                rpcsLog:                   log,
1✔
210
                registeredMiddlewareNames: make(map[string]int),
1✔
211
                mandatoryMiddleware:       mandatoryMiddleware,
1✔
212
                quit:                      make(chan struct{}),
1✔
213
        }
1✔
214
}
1✔
215

216
// Start starts the InterceptorChain, which is needed to start the state
217
// subscription server it powers.
218
func (r *InterceptorChain) Start() error {
1✔
219
        var err error
1✔
220
        r.started.Do(func() {
2✔
221
                err = r.ntfnServer.Start()
1✔
222
        })
1✔
223

224
        return err
1✔
225
}
226

227
// Stop stops the InterceptorChain and its internal state subscription server.
228
func (r *InterceptorChain) Stop() error {
1✔
229
        var err error
1✔
230
        r.stopped.Do(func() {
2✔
231
                close(r.quit)
1✔
232
                err = r.ntfnServer.Stop()
1✔
233
        })
1✔
234

235
        return err
1✔
236
}
237

238
// SetWalletNotCreated moves the RPC state from either waitingToStart to
239
// walletNotCreated.
240
func (r *InterceptorChain) SetWalletNotCreated() {
1✔
241
        r.Lock()
1✔
242
        defer r.Unlock()
1✔
243

1✔
244
        r.state = walletNotCreated
1✔
245
        _ = r.ntfnServer.SendUpdate(r.state)
1✔
246
}
1✔
247

248
// SetWalletLocked moves the RPC state from either walletNotCreated to
249
// walletLocked.
250
func (r *InterceptorChain) SetWalletLocked() {
1✔
251
        r.Lock()
1✔
252
        defer r.Unlock()
1✔
253

1✔
254
        r.state = walletLocked
1✔
255
        _ = r.ntfnServer.SendUpdate(r.state)
1✔
256
}
1✔
257

258
// SetWalletUnlocked moves the RPC state from either walletNotCreated or
259
// walletLocked to walletUnlocked.
260
func (r *InterceptorChain) SetWalletUnlocked() {
1✔
261
        r.Lock()
1✔
262
        defer r.Unlock()
1✔
263

1✔
264
        r.state = walletUnlocked
1✔
265
        _ = r.ntfnServer.SendUpdate(r.state)
1✔
266
}
1✔
267

268
// SetRPCActive moves the RPC state from walletUnlocked to rpcActive.
269
func (r *InterceptorChain) SetRPCActive() {
1✔
270
        r.Lock()
1✔
271
        defer r.Unlock()
1✔
272

1✔
273
        r.state = rpcActive
1✔
274
        _ = r.ntfnServer.SendUpdate(r.state)
1✔
275
}
1✔
276

277
// SetServerActive moves the RPC state from walletUnlocked to rpcActive.
278
func (r *InterceptorChain) SetServerActive() {
1✔
279
        r.Lock()
1✔
280
        defer r.Unlock()
1✔
281

1✔
282
        r.state = serverActive
1✔
283
        _ = r.ntfnServer.SendUpdate(r.state)
1✔
284
}
1✔
285

286
// rpcStateToWalletState converts rpcState to lnrpc.WalletState. Returns
287
// WAITING_TO_START and an error on conversion error.
288
func rpcStateToWalletState(state rpcState) (lnrpc.WalletState, error) {
1✔
289
        const defaultState = lnrpc.WalletState_WAITING_TO_START
1✔
290
        var walletState lnrpc.WalletState
1✔
291

1✔
292
        switch state {
1✔
293
        case waitingToStart:
×
294
                walletState = lnrpc.WalletState_WAITING_TO_START
×
295
        case walletNotCreated:
1✔
296
                walletState = lnrpc.WalletState_NON_EXISTING
1✔
297
        case walletLocked:
1✔
298
                walletState = lnrpc.WalletState_LOCKED
1✔
299
        case walletUnlocked:
1✔
300
                walletState = lnrpc.WalletState_UNLOCKED
1✔
301
        case rpcActive:
1✔
302
                walletState = lnrpc.WalletState_RPC_ACTIVE
1✔
303
        case serverActive:
1✔
304
                walletState = lnrpc.WalletState_SERVER_ACTIVE
1✔
305

306
        default:
×
307
                return defaultState, fmt.Errorf("unknown wallet state %v", state)
×
308
        }
309

310
        return walletState, nil
1✔
311
}
312

313
// SubscribeState subscribes to the state of the wallet. The current wallet
314
// state will always be delivered immediately.
315
//
316
// NOTE: Part of the StateService interface.
317
func (r *InterceptorChain) SubscribeState(_ *lnrpc.SubscribeStateRequest,
318
        stream lnrpc.State_SubscribeStateServer) error {
1✔
319

1✔
320
        sendStateUpdate := func(state rpcState) error {
2✔
321
                walletState, err := rpcStateToWalletState(state)
1✔
322
                if err != nil {
1✔
323
                        return err
×
324
                }
×
325

326
                return stream.Send(&lnrpc.SubscribeStateResponse{
1✔
327
                        State: walletState,
1✔
328
                })
1✔
329
        }
330

331
        // Subscribe to state updates.
332
        client, err := r.ntfnServer.Subscribe()
1✔
333
        if err != nil {
1✔
334
                return err
×
335
        }
×
336
        defer client.Cancel()
1✔
337

1✔
338
        // Always start by sending the current state.
1✔
339
        r.RLock()
1✔
340
        state := r.state
1✔
341
        r.RUnlock()
1✔
342

1✔
343
        if err := sendStateUpdate(state); err != nil {
1✔
344
                return err
×
345
        }
×
346

347
        for {
2✔
348
                select {
1✔
349
                case e := <-client.Updates():
1✔
350
                        newState := e.(rpcState)
1✔
351

1✔
352
                        // Ignore already sent state.
1✔
353
                        if newState == state {
1✔
354
                                continue
×
355
                        }
356

357
                        state = newState
1✔
358
                        err := sendStateUpdate(state)
1✔
359
                        if err != nil {
1✔
UNCOV
360
                                return err
×
UNCOV
361
                        }
×
362

363
                // The response stream's context for whatever reason has been
364
                // closed. If context is closed by an exceeded deadline we will
365
                // return an error.
366
                case <-stream.Context().Done():
1✔
367
                        if errors.Is(stream.Context().Err(), context.Canceled) {
2✔
368
                                return nil
1✔
369
                        }
1✔
370
                        return stream.Context().Err()
×
371

372
                case <-r.quit:
×
373
                        return fmt.Errorf("server exiting")
×
374
                }
375
        }
376
}
377

378
// GetState returns the current wallet state.
379
func (r *InterceptorChain) GetState(_ context.Context,
380
        _ *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) {
×
381

×
382
        r.RLock()
×
383
        state := r.state
×
384
        r.RUnlock()
×
385

×
386
        walletState, err := rpcStateToWalletState(state)
×
387
        if err != nil {
×
388
                return nil, err
×
389
        }
×
390

391
        return &lnrpc.GetStateResponse{
×
392
                State: walletState,
×
393
        }, nil
×
394
}
395

396
// AddMacaroonService adds a macaroon service to the interceptor. After this is
397
// done every RPC call made will have to pass a valid macaroon to be accepted.
398
func (r *InterceptorChain) AddMacaroonService(svc *macaroons.Service) {
1✔
399
        r.Lock()
1✔
400
        defer r.Unlock()
1✔
401

1✔
402
        r.svc = svc
1✔
403
}
1✔
404

405
// MacaroonService returns the currently registered macaroon service. This might
406
// be nil if none was registered (yet).
407
func (r *InterceptorChain) MacaroonService() *macaroons.Service {
1✔
408
        r.RLock()
1✔
409
        defer r.RUnlock()
1✔
410

1✔
411
        return r.svc
1✔
412
}
1✔
413

414
// AddPermission adds a new macaroon rule for the given method.
415
func (r *InterceptorChain) AddPermission(method string, ops []bakery.Op) error {
1✔
416
        r.Lock()
1✔
417
        defer r.Unlock()
1✔
418

1✔
419
        if _, ok := r.permissionMap[method]; ok {
1✔
420
                return fmt.Errorf("detected duplicate macaroon constraints "+
×
421
                        "for path: %v", method)
×
422
        }
×
423

424
        r.permissionMap[method] = ops
1✔
425
        return nil
1✔
426
}
427

428
// Permissions returns the current set of macaroon permissions.
429
func (r *InterceptorChain) Permissions() map[string][]bakery.Op {
1✔
430
        r.RLock()
1✔
431
        defer r.RUnlock()
1✔
432

1✔
433
        // Make a copy under the read lock to avoid races.
1✔
434
        c := make(map[string][]bakery.Op)
1✔
435
        for k, v := range r.permissionMap {
2✔
436
                s := make([]bakery.Op, len(v))
1✔
437
                copy(s, v)
1✔
438
                c[k] = s
1✔
439
        }
1✔
440

441
        return c
1✔
442
}
443

444
// RegisterMiddleware registers a new middleware that will handle request/
445
// response interception for all RPC messages that are initiated with a custom
446
// macaroon caveat. The name of the custom caveat a middleware is handling is
447
// also its unique identifier. Only one middleware can be registered for each
448
// custom caveat.
449
func (r *InterceptorChain) RegisterMiddleware(mw *MiddlewareHandler) error {
1✔
450
        r.Lock()
1✔
451
        defer r.Unlock()
1✔
452

1✔
453
        // The name of the middleware is the unique identifier.
1✔
454
        _, ok := r.registeredMiddlewareNames[mw.middlewareName]
1✔
455
        if ok {
1✔
456
                return fmt.Errorf("a middleware with the name '%s' is already "+
×
457
                        "registered", mw.middlewareName)
×
458
        }
×
459

460
        // For now, we only want one middleware per custom caveat name. If we
461
        // allowed multiple middlewares handling the same caveat there would be
462
        // a need for extra call chaining logic, and they could overwrite each
463
        // other's responses.
464
        for _, middleware := range r.registeredMiddleware {
1✔
465
                if middleware.customCaveatName == mw.customCaveatName {
×
466
                        return fmt.Errorf("a middleware is already registered "+
×
467
                                "for the custom caveat name '%s': %v",
×
468
                                mw.customCaveatName, middleware.middlewareName)
×
469
                }
×
470
        }
471

472
        r.registeredMiddleware = append(r.registeredMiddleware, mw)
1✔
473
        index := len(r.registeredMiddleware) - 1
1✔
474
        r.registeredMiddlewareNames[mw.middlewareName] = index
1✔
475

1✔
476
        return nil
1✔
477
}
478

479
// RemoveMiddleware removes the middleware that handles the given custom caveat
480
// name.
481
func (r *InterceptorChain) RemoveMiddleware(middlewareName string) {
1✔
482
        r.Lock()
1✔
483
        defer r.Unlock()
1✔
484

1✔
485
        log.Debugf("Removing middleware %s", middlewareName)
1✔
486

1✔
487
        index, ok := r.registeredMiddlewareNames[middlewareName]
1✔
488
        if !ok {
1✔
489
                return
×
490
        }
×
491
        delete(r.registeredMiddlewareNames, middlewareName)
1✔
492

1✔
493
        r.registeredMiddleware = append(
1✔
494
                r.registeredMiddleware[:index],
1✔
495
                r.registeredMiddleware[index+1:]...,
1✔
496
        )
1✔
497

1✔
498
        // Re-initialise the middleware look-up map with the updated indexes.
1✔
499
        r.registeredMiddlewareNames = make(map[string]int)
1✔
500
        for i, mw := range r.registeredMiddleware {
1✔
501
                r.registeredMiddlewareNames[mw.middlewareName] = i
×
502
        }
×
503
}
504

505
// CustomCaveatSupported makes sure a middleware that handles the given custom
506
// caveat name is registered. If none is, an error is returned, signalling to
507
// the macaroon bakery and its validator to reject macaroons that have a custom
508
// caveat with that name.
509
//
510
// NOTE: This method is part of the macaroons.CustomCaveatAcceptor interface.
511
func (r *InterceptorChain) CustomCaveatSupported(customCaveatName string) error {
1✔
512
        r.RLock()
1✔
513
        defer r.RUnlock()
1✔
514

1✔
515
        // We only accept requests with a custom caveat if we also have a
1✔
516
        // middleware registered that handles that custom caveat. That is
1✔
517
        // crucial for security! Otherwise a request with an encumbered (=has
1✔
518
        // restricted permissions based upon the custom caveat condition)
1✔
519
        // macaroon would not be validated against the limitations that the
1✔
520
        // custom caveat implicate. Since the map is keyed by the _name_ of the
1✔
521
        // middleware, we need to loop through all of them to see if one has
1✔
522
        // the given custom macaroon caveat name.
1✔
523
        for _, middleware := range r.registeredMiddleware {
2✔
524
                if middleware.customCaveatName == customCaveatName {
2✔
525
                        return nil
1✔
526
                }
1✔
527
        }
528

529
        return fmt.Errorf("cannot accept macaroon with custom caveat '%s', "+
1✔
530
                "no middleware registered to handle it", customCaveatName)
1✔
531
}
532

533
// CreateServerOpts creates the GRPC server options that can be added to a GRPC
534
// server in order to add this InterceptorChain.
535
func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption {
1✔
536
        var unaryInterceptors []grpc.UnaryServerInterceptor
1✔
537
        var strmInterceptors []grpc.StreamServerInterceptor
1✔
538

1✔
539
        // The first interceptors we'll add to the chain is our logging
1✔
540
        // interceptors, so we can automatically log all errors that happen
1✔
541
        // during RPC calls.
1✔
542
        unaryInterceptors = append(
1✔
543
                unaryInterceptors, errorLogUnaryServerInterceptor(r.rpcsLog),
1✔
544
        )
1✔
545
        strmInterceptors = append(
1✔
546
                strmInterceptors, errorLogStreamServerInterceptor(r.rpcsLog),
1✔
547
        )
1✔
548

1✔
549
        // Next we'll add our RPC state check interceptors, that will check
1✔
550
        // whether the attempted call is allowed in the current state.
1✔
551
        unaryInterceptors = append(
1✔
552
                unaryInterceptors, r.rpcStateUnaryServerInterceptor(),
1✔
553
        )
1✔
554
        strmInterceptors = append(
1✔
555
                strmInterceptors, r.rpcStateStreamServerInterceptor(),
1✔
556
        )
1✔
557

1✔
558
        // We'll add the macaroon interceptors. If macaroons aren't disabled,
1✔
559
        // then these interceptors will enforce macaroon authentication.
1✔
560
        unaryInterceptors = append(
1✔
561
                unaryInterceptors, r.MacaroonUnaryServerInterceptor(),
1✔
562
        )
1✔
563
        strmInterceptors = append(
1✔
564
                strmInterceptors, r.MacaroonStreamServerInterceptor(),
1✔
565
        )
1✔
566

1✔
567
        // Next, we'll add the interceptors for our custom macaroon caveat based
1✔
568
        // middleware.
1✔
569
        unaryInterceptors = append(
1✔
570
                unaryInterceptors, r.middlewareUnaryServerInterceptor(),
1✔
571
        )
1✔
572
        strmInterceptors = append(
1✔
573
                strmInterceptors, r.middlewareStreamServerInterceptor(),
1✔
574
        )
1✔
575

1✔
576
        // Get interceptors for Prometheus to gather gRPC performance metrics.
1✔
577
        // If monitoring is disabled, GetPromInterceptors() will return empty
1✔
578
        // slices.
1✔
579
        promUnaryInterceptors, promStrmInterceptors :=
1✔
580
                monitoring.GetPromInterceptors()
1✔
581

1✔
582
        // Concatenate the slices of unary and stream interceptors respectively.
1✔
583
        unaryInterceptors = append(unaryInterceptors, promUnaryInterceptors...)
1✔
584
        strmInterceptors = append(strmInterceptors, promStrmInterceptors...)
1✔
585

1✔
586
        // Create server options from the interceptors we just set up.
1✔
587
        chainedUnary := grpc_middleware.WithUnaryServerChain(
1✔
588
                unaryInterceptors...,
1✔
589
        )
1✔
590
        chainedStream := grpc_middleware.WithStreamServerChain(
1✔
591
                strmInterceptors...,
1✔
592
        )
1✔
593
        serverOpts := []grpc.ServerOption{chainedUnary, chainedStream}
1✔
594

1✔
595
        return serverOpts
1✔
596
}
1✔
597

598
// errorLogUnaryServerInterceptor is a simple UnaryServerInterceptor that will
599
// automatically log any errors that occur when serving a client's unary
600
// request.
601
func errorLogUnaryServerInterceptor(logger btclog.Logger) grpc.UnaryServerInterceptor {
1✔
602
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
1✔
603
                handler grpc.UnaryHandler) (interface{}, error) {
2✔
604

1✔
605
                resp, err := handler(ctx, req)
1✔
606
                if err != nil {
2✔
607
                        // TODO(roasbeef): also log request details?
1✔
608
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
1✔
609
                }
1✔
610

611
                return resp, err
1✔
612
        }
613
}
614

615
// errorLogStreamServerInterceptor is a simple StreamServerInterceptor that
616
// will log any errors that occur while processing a client or server streaming
617
// RPC.
618
func errorLogStreamServerInterceptor(logger btclog.Logger) grpc.StreamServerInterceptor {
1✔
619
        return func(srv interface{}, ss grpc.ServerStream,
1✔
620
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
2✔
621

1✔
622
                err := handler(srv, ss)
1✔
623
                if err != nil {
2✔
624
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
1✔
625
                }
1✔
626

627
                return err
1✔
628
        }
629
}
630

631
// checkMacaroon validates that the context contains the macaroon needed to
632
// invoke the given RPC method.
633
func (r *InterceptorChain) checkMacaroon(ctx context.Context,
634
        fullMethod string) error {
1✔
635

1✔
636
        // If noMacaroons is set, we'll always allow the call.
1✔
637
        if r.noMacaroons {
1✔
638
                return nil
×
639
        }
×
640

641
        // Check whether the method is whitelisted, if so we'll allow it
642
        // regardless of macaroons.
643
        _, ok := macaroonWhitelist[fullMethod]
1✔
644
        if ok {
2✔
645
                return nil
1✔
646
        }
1✔
647

648
        r.RLock()
1✔
649
        svc := r.svc
1✔
650
        r.RUnlock()
1✔
651

1✔
652
        // If the macaroon service is not yet active, we cannot allow
1✔
653
        // the call.
1✔
654
        if svc == nil {
1✔
655
                return fmt.Errorf("unable to determine macaroon permissions")
×
656
        }
×
657

658
        r.RLock()
1✔
659
        uriPermissions, ok := r.permissionMap[fullMethod]
1✔
660
        r.RUnlock()
1✔
661
        if !ok {
1✔
662
                return fmt.Errorf("%s: unknown permissions required for method",
×
663
                        fullMethod)
×
664
        }
×
665

666
        // Find out if there is an external validator registered for
667
        // this method. Fall back to the internal one if there isn't.
668
        validator, ok := svc.ExternalValidators[fullMethod]
1✔
669
        if !ok {
2✔
670
                validator = svc
1✔
671
        }
1✔
672

673
        // Now that we know what validator to use, let it do its work.
674
        return validator.ValidateMacaroon(ctx, uriPermissions, fullMethod)
1✔
675
}
676

677
// MacaroonUnaryServerInterceptor is a GRPC interceptor that checks whether the
678
// request is authorized by the included macaroons.
679
func (r *InterceptorChain) MacaroonUnaryServerInterceptor() grpc.UnaryServerInterceptor {
1✔
680
        return func(ctx context.Context, req interface{},
1✔
681
                info *grpc.UnaryServerInfo,
1✔
682
                handler grpc.UnaryHandler) (interface{}, error) {
2✔
683

1✔
684
                // Check macaroons.
1✔
685
                if err := r.checkMacaroon(ctx, info.FullMethod); err != nil {
2✔
686
                        return nil, err
1✔
687
                }
1✔
688

689
                return handler(ctx, req)
1✔
690
        }
691
}
692

693
// MacaroonStreamServerInterceptor is a GRPC interceptor that checks whether
694
// the request is authorized by the included macaroons.
695
func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerInterceptor {
1✔
696
        return func(srv interface{}, ss grpc.ServerStream,
1✔
697
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
2✔
698

1✔
699
                // Check macaroons.
1✔
700
                err := r.checkMacaroon(ss.Context(), info.FullMethod)
1✔
701
                if err != nil {
1✔
702
                        return err
×
703
                }
×
704

705
                return handler(srv, ss)
1✔
706
        }
707
}
708

709
// checkRPCState checks whether a call to the given server is allowed in the
710
// current RPC state.
711
func (r *InterceptorChain) checkRPCState(srv interface{}) error {
1✔
712
        // The StateService is being accessed, we allow the call regardless of
1✔
713
        // the current state.
1✔
714
        _, ok := srv.(lnrpc.StateServer)
1✔
715
        if ok {
2✔
716
                return nil
1✔
717
        }
1✔
718

719
        r.RLock()
1✔
720
        state := r.state
1✔
721
        r.RUnlock()
1✔
722

1✔
723
        switch state {
1✔
724
        // Do not accept any RPC calls (unless to the state service) until LND
725
        // has not started.
726
        case waitingToStart:
×
727
                return ErrWaitingToStart
×
728

729
        // If the wallet does not exists, only calls to the WalletUnlocker are
730
        // accepted.
731
        case walletNotCreated:
1✔
732
                _, ok := srv.(lnrpc.WalletUnlockerServer)
1✔
733
                if !ok {
1✔
734
                        return ErrNoWallet
×
735
                }
×
736

737
        // If the wallet is locked, only calls to the WalletUnlocker are
738
        // accepted.
739
        case walletLocked:
1✔
740
                _, ok := srv.(lnrpc.WalletUnlockerServer)
1✔
741
                if !ok {
1✔
742
                        return ErrWalletLocked
×
743
                }
×
744

745
        // If the wallet is unlocked, but the RPC not yet active, we reject.
746
        case walletUnlocked:
×
747
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
748
                if ok {
×
749
                        return ErrWalletUnlocked
×
750
                }
×
751

752
                return ErrRPCStarting
×
753

754
        // If the RPC server or lnd server is active, we allow calls to any
755
        // service except the WalletUnlocker.
756
        case rpcActive, serverActive:
1✔
757
                _, ok := srv.(lnrpc.WalletUnlockerServer)
1✔
758
                if ok {
1✔
759
                        return ErrWalletUnlocked
×
760
                }
×
761

762
        default:
×
763
                return fmt.Errorf("unknown RPC state: %v", state)
×
764
        }
765

766
        return nil
1✔
767
}
768

769
// rpcStateUnaryServerInterceptor is a GRPC interceptor that checks whether
770
// calls to the given gGRPC server is allowed in the current rpc state.
771
func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInterceptor {
1✔
772
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
1✔
773
                handler grpc.UnaryHandler) (interface{}, error) {
2✔
774

1✔
775
                r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
1✔
776

1✔
777
                if err := r.checkRPCState(info.Server); err != nil {
1✔
778
                        return nil, err
×
779
                }
×
780

781
                return handler(ctx, req)
1✔
782
        }
783
}
784

785
// rpcStateStreamServerInterceptor is a GRPC interceptor that checks whether
786
// calls to the given gGRPC server is allowed in the current rpc state.
787
func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerInterceptor {
1✔
788
        return func(srv interface{}, ss grpc.ServerStream,
1✔
789
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
2✔
790

1✔
791
                r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
1✔
792

1✔
793
                if err := r.checkRPCState(srv); err != nil {
1✔
794
                        return err
×
795
                }
×
796

797
                return handler(srv, ss)
1✔
798
        }
799
}
800

801
// middlewareUnaryServerInterceptor is a unary gRPC interceptor that intercepts
802
// all requests and responses that are sent with a macaroon containing a custom
803
// caveat condition that is handled by registered middleware.
804
func (r *InterceptorChain) middlewareUnaryServerInterceptor() grpc.UnaryServerInterceptor {
1✔
805
        return func(ctx context.Context,
1✔
806
                req interface{}, info *grpc.UnaryServerInfo,
1✔
807
                handler grpc.UnaryHandler) (interface{}, error) {
2✔
808

1✔
809
                // Make sure we don't allow any requests through if one of the
1✔
810
                // mandatory middlewares is missing.
1✔
811
                fullMethod := info.FullMethod
1✔
812
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
1✔
813
                        return nil, err
×
814
                }
×
815

816
                // If there is no middleware registered, we don't need to
817
                // intercept anything.
818
                if !r.middlewareRegistered() {
2✔
819
                        return handler(ctx, req)
1✔
820
                }
1✔
821

822
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
1✔
823
                req, err := r.interceptMessage(
1✔
824
                        ctx, TypeRequest, requestID, false, info.FullMethod,
1✔
825
                        req,
1✔
826
                )
1✔
827
                if err != nil {
1✔
828
                        return nil, err
×
829
                }
×
830

831
                // Call the handler, which executes the request against lnd.
832
                lndResp, lndErr := handler(ctx, req)
1✔
833
                if lndErr != nil {
2✔
834
                        // The call to lnd ended in an error and not a normal
1✔
835
                        // proto message response. Send the error to the
1✔
836
                        // interceptor as well to inform about the abnormal
1✔
837
                        // termination of the stream and to give the option to
1✔
838
                        // replace the error message with a custom one.
1✔
839
                        replacedErr, err := r.interceptMessage(
1✔
840
                                ctx, TypeResponse, requestID, false,
1✔
841
                                info.FullMethod, lndErr,
1✔
842
                        )
1✔
843
                        if err != nil {
1✔
844
                                return nil, err
×
845
                        }
×
846
                        return lndResp, replacedErr.(error)
1✔
847
                }
848

849
                return r.interceptMessage(
1✔
850
                        ctx, TypeResponse, requestID, false, info.FullMethod,
1✔
851
                        lndResp,
1✔
852
                )
1✔
853
        }
854
}
855

856
// middlewareStreamServerInterceptor is a streaming gRPC interceptor that
857
// intercepts all requests and responses that are sent with a macaroon
858
// containing a custom caveat condition that is handled by registered
859
// middleware.
860
func (r *InterceptorChain) middlewareStreamServerInterceptor() grpc.StreamServerInterceptor {
1✔
861
        return func(srv interface{},
1✔
862
                ss grpc.ServerStream, info *grpc.StreamServerInfo,
1✔
863
                handler grpc.StreamHandler) error {
2✔
864

1✔
865
                // Don't intercept the interceptor itself which is a streaming
1✔
866
                // RPC too!
1✔
867
                fullMethod := info.FullMethod
1✔
868
                if fullMethod == lnrpc.RegisterRPCMiddlewareURI {
2✔
869
                        return handler(srv, ss)
1✔
870
                }
1✔
871

872
                // Make sure we don't allow any requests through if one of the
873
                // mandatory middlewares is missing. We add this check here to
874
                // make sure the middleware registration itself can still be
875
                // called.
876
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
1✔
877
                        return err
×
878
                }
×
879

880
                // If there is no middleware registered, we don't need to
881
                // intercept anything.
882
                if !r.middlewareRegistered() {
2✔
883
                        return handler(srv, ss)
1✔
884
                }
1✔
885

886
                // To give the middleware a chance to accept or reject the
887
                // establishment of the stream itself (and not only when the
888
                // first message is sent on the stream), we send an intercept
889
                // request for the stream auth now:
890
                msg, err := NewStreamAuthInterceptionRequest(
1✔
891
                        ss.Context(), info.FullMethod,
1✔
892
                )
1✔
893
                if err != nil {
1✔
894
                        return err
×
895
                }
×
896

897
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
1✔
898
                err = r.acceptStream(requestID, msg)
1✔
899
                if err != nil {
1✔
900
                        return err
×
901
                }
×
902

903
                wrappedSS := &serverStreamWrapper{
1✔
904
                        ServerStream: ss,
1✔
905
                        requestID:    requestID,
1✔
906
                        fullMethod:   info.FullMethod,
1✔
907
                        interceptor:  r,
1✔
908
                }
1✔
909

1✔
910
                // Call the stream handler, which will block as long as the
1✔
911
                // stream is alive.
1✔
912
                lndErr := handler(srv, wrappedSS)
1✔
913
                if lndErr != nil {
2✔
914
                        // This is an error being returned from lnd. Send it to
1✔
915
                        // the interceptor as well to inform about the abnormal
1✔
916
                        // termination of the stream and to give the option to
1✔
917
                        // replace the error message with a custom one.
1✔
918
                        replacedErr, err := r.interceptMessage(
1✔
919
                                ss.Context(), TypeResponse, requestID,
1✔
920
                                true, info.FullMethod, lndErr,
1✔
921
                        )
1✔
922
                        if err != nil {
1✔
923
                                return err
×
924
                        }
×
925

926
                        return replacedErr.(error)
1✔
927
                }
928

929
                // Normal/successful termination of the stream.
930
                return nil
1✔
931
        }
932
}
933

934
// checkMandatoryMiddleware makes sure that each of the middlewares declared as
935
// mandatory is currently registered.
936
func (r *InterceptorChain) checkMandatoryMiddleware(fullMethod string) error {
1✔
937
        r.RLock()
1✔
938
        defer r.RUnlock()
1✔
939

1✔
940
        // Allow calls that are whitelisted for macaroons as well, otherwise we
1✔
941
        // get into impossible situations where the wallet is locked but the
1✔
942
        // unlock call is denied because the middleware isn't registered. But
1✔
943
        // the middleware cannot register itself because the wallet is locked.
1✔
944
        if _, ok := macaroonWhitelist[fullMethod]; ok {
2✔
945
                return nil
1✔
946
        }
1✔
947

948
        // Not a white listed call so make sure every mandatory middleware is
949
        // currently connected to lnd.
950
        for _, name := range r.mandatoryMiddleware {
1✔
951
                if _, ok := r.registeredMiddlewareNames[name]; !ok {
×
952
                        return fmt.Errorf("mandatory middleware '%s' is "+
×
953
                                "currently not registered, not allowing any "+
×
954
                                "RPC calls", name)
×
955
                }
×
956
        }
957

958
        return nil
1✔
959
}
960

961
// middlewareRegistered returns true if there is at least one middleware
962
// currently registered.
963
func (r *InterceptorChain) middlewareRegistered() bool {
1✔
964
        r.RLock()
1✔
965
        defer r.RUnlock()
1✔
966

1✔
967
        return len(r.registeredMiddleware) > 0
1✔
968
}
1✔
969

970
// acceptStream sends an intercept request to all middlewares that have
971
// registered for it. This means either a middleware has requested read-only
972
// access or the request actually has a macaroon with a caveat the middleware
973
// registered for.
974
func (r *InterceptorChain) acceptStream(requestID uint64,
975
        msg *InterceptionRequest) error {
1✔
976

1✔
977
        r.RLock()
1✔
978
        defer r.RUnlock()
1✔
979

1✔
980
        for _, middleware := range r.registeredMiddleware {
2✔
981
                // If there is a custom caveat in the macaroon, make sure the
1✔
982
                // middleware registered for it. Or if a middleware registered
1✔
983
                // for read-only mode, it also gets the request.
1✔
984
                hasCustomCaveat := macaroons.HasCustomCaveat(
1✔
985
                        msg.Macaroon, middleware.customCaveatName,
1✔
986
                )
1✔
987
                if !hasCustomCaveat && !middleware.readOnly {
1✔
988
                        continue
×
989
                }
990

991
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
1✔
992
                        msg.Macaroon, middleware.customCaveatName,
1✔
993
                )
1✔
994

1✔
995
                resp, err := middleware.intercept(requestID, msg)
1✔
996

1✔
997
                // Error during interception itself.
1✔
998
                if err != nil {
1✔
999
                        return err
×
1000
                }
×
1001

1002
                // Error returned from middleware client.
1003
                if resp.err != nil {
1✔
1004
                        return resp.err
×
1005
                }
×
1006
        }
1007

1008
        return nil
1✔
1009
}
1010

1011
// interceptMessage sends out an intercept request for an RPC response. Since
1012
// middleware that hasn't registered for the read-only mode has the option to
1013
// overwrite/replace the message, this needs to be handled differently than the
1014
// auth path above.
1015
func (r *InterceptorChain) interceptMessage(ctx context.Context,
1016
        interceptType InterceptType, requestID uint64, isStream bool,
1017
        fullMethod string, m interface{}) (interface{}, error) {
1✔
1018

1✔
1019
        r.RLock()
1✔
1020
        defer r.RUnlock()
1✔
1021

1✔
1022
        currentMessage := m
1✔
1023
        for _, middleware := range r.registeredMiddleware {
2✔
1024
                msg, err := NewMessageInterceptionRequest(
1✔
1025
                        ctx, interceptType, isStream, fullMethod,
1✔
1026
                        currentMessage,
1✔
1027
                )
1✔
1028
                if err != nil {
1✔
1029
                        return nil, err
×
1030
                }
×
1031

1032
                // If there is a custom caveat in the macaroon, make sure the
1033
                // middleware registered for it. Or if a middleware registered
1034
                // for read-only mode, it also gets the request.
1035
                hasCustomCaveat := macaroons.HasCustomCaveat(
1✔
1036
                        msg.Macaroon, middleware.customCaveatName,
1✔
1037
                )
1✔
1038
                if !hasCustomCaveat && !middleware.readOnly {
2✔
1039
                        continue
1✔
1040
                }
1041

1042
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
1✔
1043
                        msg.Macaroon, middleware.customCaveatName,
1✔
1044
                )
1✔
1045

1✔
1046
                resp, err := middleware.intercept(requestID, msg)
1✔
1047

1✔
1048
                // Error during interception itself.
1✔
1049
                if err != nil {
1✔
1050
                        return nil, err
×
1051
                }
×
1052

1053
                // Error returned from middleware client.
1054
                if resp.err != nil {
1✔
1055
                        return nil, resp.err
×
1056
                }
×
1057

1058
                // The message was replaced, make sure the next middleware in
1059
                // line receives the updated message.
1060
                if !middleware.readOnly && resp.replace {
2✔
1061
                        currentMessage = resp.replacement
1✔
1062
                }
1✔
1063
        }
1064

1065
        return currentMessage, nil
1✔
1066
}
1067

1068
// serverStreamWrapper is a struct that wraps a server stream in a way that all
1069
// requests and responses can be intercepted individually.
1070
type serverStreamWrapper struct {
1071
        // ServerStream is the stream that's being wrapped.
1072
        grpc.ServerStream
1073

1074
        requestID uint64
1075

1076
        fullMethod string
1077

1078
        interceptor *InterceptorChain
1079
}
1080

1081
// SendMsg is called when lnd sends a message to the client. This is wrapped to
1082
// intercept streaming RPC responses.
1083
func (w *serverStreamWrapper) SendMsg(m interface{}) error {
1✔
1084
        newMsg, err := w.interceptor.interceptMessage(
1✔
1085
                w.ServerStream.Context(), TypeResponse, w.requestID, true,
1✔
1086
                w.fullMethod, m,
1✔
1087
        )
1✔
1088
        if err != nil {
1✔
1089
                return err
×
1090
        }
×
1091

1092
        return w.ServerStream.SendMsg(newMsg)
1✔
1093
}
1094

1095
// RecvMsg is called when lnd wants to receive a message from the client. This
1096
// is wrapped to intercept streaming RPC requests.
1097
func (w *serverStreamWrapper) RecvMsg(m interface{}) error {
1✔
1098
        err := w.ServerStream.RecvMsg(m)
1✔
1099
        if err != nil {
1✔
1100
                return err
×
1101
        }
×
1102

1103
        req, err := w.interceptor.interceptMessage(
1✔
1104
                w.ServerStream.Context(), TypeRequest, w.requestID, true,
1✔
1105
                w.fullMethod, m,
1✔
1106
        )
1✔
1107
        if err != nil {
1✔
1108
                return err
×
1109
        }
×
1110

1111
        return replaceProtoMsg(m, req)
1✔
1112
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc