• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/rpcperms/interceptor.go
1
package rpcperms
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btclog/v2"
11
        grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
12
        "github.com/lightningnetwork/lnd/lnrpc"
13
        "github.com/lightningnetwork/lnd/macaroons"
14
        "github.com/lightningnetwork/lnd/monitoring"
15
        "github.com/lightningnetwork/lnd/subscribe"
16
        "google.golang.org/grpc"
17
        "gopkg.in/macaroon-bakery.v2/bakery"
18
)
19

20
// rpcState is an enum that we use to keep track of the current RPC service
21
// state. This will transition as we go from startup to unlocking the wallet,
22
// and finally fully active.
23
type rpcState uint8
24

25
const (
26
        // waitingToStart indicates that we're at the beginning of the startup
27
        // process. In a cluster environment this may mean that we're waiting to
28
        // become the leader in which case RPC calls will be disabled until
29
        // this instance has been elected as leader.
30
        waitingToStart rpcState = iota
31

32
        // walletNotCreated is the starting state if the RPC server is active,
33
        // but the wallet is not yet created. In this state we'll only allow
34
        // calls to the WalletUnlockerService.
35
        walletNotCreated
36

37
        // walletLocked indicates the RPC server is active, but the wallet is
38
        // locked. In this state we'll only allow calls to the
39
        // WalletUnlockerService.
40
        walletLocked
41

42
        // walletUnlocked means that the wallet has been unlocked, but the full
43
        // RPC server is not yet ready.
44
        walletUnlocked
45

46
        // rpcActive means that the RPC server is ready to accept calls.
47
        rpcActive
48

49
        // serverActive means that the lnd server is ready to accept calls.
50
        serverActive
51
)
52

53
var (
54
        // ErrWaitingToStart is returned if LND is still waiting to start,
55
        // possibly blocked until elected as the leader.
56
        ErrWaitingToStart = fmt.Errorf("waiting to start, RPC services not " +
57
                "available")
58

59
        // ErrNoWallet is returned if the wallet does not exist.
60
        ErrNoWallet = fmt.Errorf("wallet not created, create one to enable " +
61
                "full RPC access")
62

63
        // ErrWalletLocked is returned if the wallet is locked and any service
64
        // other than the WalletUnlocker is called.
65
        ErrWalletLocked = fmt.Errorf("wallet locked, unlock it to enable " +
66
                "full RPC access")
67

68
        // ErrWalletUnlocked is returned if the WalletUnlocker service is
69
        // called when the wallet already has been unlocked.
70
        ErrWalletUnlocked = fmt.Errorf("wallet already unlocked, " +
71
                "WalletUnlocker service is no longer available")
72

73
        // ErrRPCStarting is returned if the wallet has been unlocked but the
74
        // RPC server is not yet ready to accept calls.
75
        ErrRPCStarting = fmt.Errorf("the RPC server is in the process of " +
76
                "starting up, but not yet ready to accept calls")
77

78
        // macaroonWhitelist defines methods that we don't require macaroons to
79
        // access. We also allow these methods to be called even if not all
80
        // mandatory middlewares are registered yet. If the wallet is locked
81
        // then a middleware cannot register itself, creating an impossible
82
        // situation. Also, a middleware might want to check the state of lnd
83
        // by calling the State service before it registers itself. So we also
84
        // need to exclude those calls from the mandatory middleware check.
85
        macaroonWhitelist = map[string]struct{}{
86
                // We allow all calls to the WalletUnlocker without macaroons.
87
                "/lnrpc.WalletUnlocker/GenSeed":        {},
88
                "/lnrpc.WalletUnlocker/InitWallet":     {},
89
                "/lnrpc.WalletUnlocker/UnlockWallet":   {},
90
                "/lnrpc.WalletUnlocker/ChangePassword": {},
91

92
                // The State service must be available at all times, even
93
                // before we can check macaroons, so we whitelist it.
94
                "/lnrpc.State/SubscribeState": {},
95
                "/lnrpc.State/GetState":       {},
96
        }
97
)
98

99
// InterceptorChain is a struct that can be added to the running GRPC server,
100
// intercepting API calls. This is useful for logging, enforcing permissions,
101
// supporting middleware etc. The following diagram shows the order of each
102
// interceptor in the chain and when exactly requests/responses are intercepted
103
// and forwarded to external middleware for approval/modification. Middleware in
104
// general can only intercept gRPC requests/responses that are sent by the
105
// client with a macaroon that contains a custom caveat that is supported by one
106
// of the registered middlewares.
107
//
108
//            |
109
//            | gRPC request from client
110
//            |
111
//        +---v--------------------------------+
112
//        |   InterceptorChain                 |
113
//        +-+----------------------------------+
114
//          | Log Interceptor                  |
115
//          +----------------------------------+
116
//          | RPC State Interceptor            |
117
//          +----------------------------------+
118
//          | Macaroon Interceptor             |
119
//          +----------------------------------+--------> +---------------------+
120
//          | RPC Macaroon Middleware Handler  |<-------- | External Middleware |
121
//          +----------------------------------+          |   - modify request |
122
//          | Prometheus Interceptor           |          +---------------------+
123
//          +-+--------------------------------+
124
//            | validated gRPC request from client
125
//        +---v--------------------------------+
126
//        |   main gRPC server                 |
127
//        +---+--------------------------------+
128
//            |
129
//            | original gRPC request to client
130
//            |
131
//        +---v--------------------------------+--------> +---------------------+
132
//        |   RPC Macaroon Middleware Handler  |<-------- | External Middleware |
133
//        +---+--------------------------------+          |   - modify response |
134
//            |                                           +---------------------+
135
//            | edited gRPC request to client
136
//            v
137
type InterceptorChain struct {
138
        // lastRequestID is the ID of the last gRPC request or stream that was
139
        // intercepted by the middleware interceptor.
140
        //
141
        // NOTE: Must be used atomically!
142
        lastRequestID uint64
143

144
        // Required by the grpc-gateway/v2 library for forward compatibility.
145
        lnrpc.UnimplementedStateServer
146

147
        started sync.Once
148
        stopped sync.Once
149

150
        // state is the current RPC state of our RPC server.
151
        state rpcState
152

153
        // ntfnServer is a subscription server we use to notify clients of the
154
        // State service when the state changes.
155
        ntfnServer *subscribe.Server
156

157
        // noMacaroons should be set true if we don't want to check macaroons.
158
        noMacaroons bool
159

160
        // svc is the macaroon service used to enforce permissions in case
161
        // macaroons are used.
162
        svc *macaroons.Service
163

164
        // permissionMap is the permissions to enforce if macaroons are used.
165
        permissionMap map[string][]bakery.Op
166

167
        // rpcsLog is the logger used to log calls to the RPCs intercepted.
168
        rpcsLog btclog.Logger
169

170
        // registeredMiddleware is a slice of all macaroon permission based RPC
171
        // middleware clients that are currently registered. The
172
        // registeredMiddlewareNames can be used to find the index of a specific
173
        // interceptor within the registeredMiddleware slide using the name of
174
        // the interceptor as the key. The reason for using these two separate
175
        // structures is so that the order in which interceptors are run is
176
        // the same as the order in which they were registered.
177
        registeredMiddleware []*MiddlewareHandler
178

179
        // registeredMiddlewareNames is a map of registered middleware names
180
        // to the index at which they are stored in the registeredMiddleware
181
        // map.
182
        registeredMiddlewareNames map[string]int
183

184
        // mandatoryMiddleware is a list of all middleware that is considered to
185
        // be mandatory. If any of them is not registered then all RPC requests
186
        // (except for the macaroon white listed methods and the middleware
187
        // registration itself) are blocked. This is a security feature to make
188
        // sure that requests can't just go through unobserved/unaudited if a
189
        // middleware crashes.
190
        mandatoryMiddleware []string
191

192
        quit chan struct{}
193
        sync.RWMutex
194
}
195

196
// A compile time check to ensure that InterceptorChain fully implements the
197
// StateServer gRPC service.
198
var _ lnrpc.StateServer = (*InterceptorChain)(nil)
199

200
// NewInterceptorChain creates a new InterceptorChain.
201
func NewInterceptorChain(log btclog.Logger, noMacaroons bool,
UNCOV
202
        mandatoryMiddleware []string) *InterceptorChain {
×
UNCOV
203

×
UNCOV
204
        return &InterceptorChain{
×
UNCOV
205
                state:                     waitingToStart,
×
UNCOV
206
                ntfnServer:                subscribe.NewServer(),
×
UNCOV
207
                noMacaroons:               noMacaroons,
×
UNCOV
208
                permissionMap:             make(map[string][]bakery.Op),
×
UNCOV
209
                rpcsLog:                   log,
×
UNCOV
210
                registeredMiddlewareNames: make(map[string]int),
×
UNCOV
211
                mandatoryMiddleware:       mandatoryMiddleware,
×
UNCOV
212
                quit:                      make(chan struct{}),
×
UNCOV
213
        }
×
UNCOV
214
}
×
215

216
// Start starts the InterceptorChain, which is needed to start the state
217
// subscription server it powers.
UNCOV
218
func (r *InterceptorChain) Start() error {
×
UNCOV
219
        var err error
×
UNCOV
220
        r.started.Do(func() {
×
UNCOV
221
                err = r.ntfnServer.Start()
×
UNCOV
222
        })
×
223

UNCOV
224
        return err
×
225
}
226

227
// Stop stops the InterceptorChain and its internal state subscription server.
UNCOV
228
func (r *InterceptorChain) Stop() error {
×
UNCOV
229
        var err error
×
UNCOV
230
        r.stopped.Do(func() {
×
UNCOV
231
                close(r.quit)
×
UNCOV
232
                err = r.ntfnServer.Stop()
×
UNCOV
233
        })
×
234

UNCOV
235
        return err
×
236
}
237

238
// SetWalletNotCreated moves the RPC state from either waitingToStart to
239
// walletNotCreated.
UNCOV
240
func (r *InterceptorChain) SetWalletNotCreated() {
×
UNCOV
241
        r.Lock()
×
UNCOV
242
        defer r.Unlock()
×
UNCOV
243

×
UNCOV
244
        r.state = walletNotCreated
×
UNCOV
245
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
246
}
×
247

248
// SetWalletLocked moves the RPC state from either walletNotCreated to
249
// walletLocked.
UNCOV
250
func (r *InterceptorChain) SetWalletLocked() {
×
UNCOV
251
        r.Lock()
×
UNCOV
252
        defer r.Unlock()
×
UNCOV
253

×
UNCOV
254
        r.state = walletLocked
×
UNCOV
255
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
256
}
×
257

258
// SetWalletUnlocked moves the RPC state from either walletNotCreated or
259
// walletLocked to walletUnlocked.
UNCOV
260
func (r *InterceptorChain) SetWalletUnlocked() {
×
UNCOV
261
        r.Lock()
×
UNCOV
262
        defer r.Unlock()
×
UNCOV
263

×
UNCOV
264
        r.state = walletUnlocked
×
UNCOV
265
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
266
}
×
267

268
// SetRPCActive moves the RPC state from walletUnlocked to rpcActive.
UNCOV
269
func (r *InterceptorChain) SetRPCActive() {
×
UNCOV
270
        r.Lock()
×
UNCOV
271
        defer r.Unlock()
×
UNCOV
272

×
UNCOV
273
        r.state = rpcActive
×
UNCOV
274
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
275
}
×
276

277
// SetServerActive moves the RPC state from walletUnlocked to rpcActive.
UNCOV
278
func (r *InterceptorChain) SetServerActive() {
×
UNCOV
279
        r.Lock()
×
UNCOV
280
        defer r.Unlock()
×
UNCOV
281

×
UNCOV
282
        r.state = serverActive
×
UNCOV
283
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
284
}
×
285

286
// rpcStateToWalletState converts rpcState to lnrpc.WalletState. Returns
287
// WAITING_TO_START and an error on conversion error.
UNCOV
288
func rpcStateToWalletState(state rpcState) (lnrpc.WalletState, error) {
×
UNCOV
289
        const defaultState = lnrpc.WalletState_WAITING_TO_START
×
UNCOV
290
        var walletState lnrpc.WalletState
×
UNCOV
291

×
UNCOV
292
        switch state {
×
UNCOV
293
        case waitingToStart:
×
UNCOV
294
                walletState = lnrpc.WalletState_WAITING_TO_START
×
UNCOV
295
        case walletNotCreated:
×
UNCOV
296
                walletState = lnrpc.WalletState_NON_EXISTING
×
UNCOV
297
        case walletLocked:
×
UNCOV
298
                walletState = lnrpc.WalletState_LOCKED
×
UNCOV
299
        case walletUnlocked:
×
UNCOV
300
                walletState = lnrpc.WalletState_UNLOCKED
×
UNCOV
301
        case rpcActive:
×
UNCOV
302
                walletState = lnrpc.WalletState_RPC_ACTIVE
×
UNCOV
303
        case serverActive:
×
UNCOV
304
                walletState = lnrpc.WalletState_SERVER_ACTIVE
×
305

306
        default:
×
307
                return defaultState, fmt.Errorf("unknown wallet state %v", state)
×
308
        }
309

UNCOV
310
        return walletState, nil
×
311
}
312

313
// SubscribeState subscribes to the state of the wallet. The current wallet
314
// state will always be delivered immediately.
315
//
316
// NOTE: Part of the StateService interface.
317
func (r *InterceptorChain) SubscribeState(_ *lnrpc.SubscribeStateRequest,
UNCOV
318
        stream lnrpc.State_SubscribeStateServer) error {
×
UNCOV
319

×
UNCOV
320
        sendStateUpdate := func(state rpcState) error {
×
UNCOV
321
                walletState, err := rpcStateToWalletState(state)
×
UNCOV
322
                if err != nil {
×
323
                        return err
×
324
                }
×
325

UNCOV
326
                return stream.Send(&lnrpc.SubscribeStateResponse{
×
UNCOV
327
                        State: walletState,
×
UNCOV
328
                })
×
329
        }
330

331
        // Subscribe to state updates.
UNCOV
332
        client, err := r.ntfnServer.Subscribe()
×
UNCOV
333
        if err != nil {
×
334
                return err
×
335
        }
×
UNCOV
336
        defer client.Cancel()
×
UNCOV
337

×
UNCOV
338
        // Always start by sending the current state.
×
UNCOV
339
        r.RLock()
×
UNCOV
340
        state := r.state
×
UNCOV
341
        r.RUnlock()
×
UNCOV
342

×
UNCOV
343
        if err := sendStateUpdate(state); err != nil {
×
344
                return err
×
345
        }
×
346

UNCOV
347
        for {
×
UNCOV
348
                select {
×
UNCOV
349
                case e := <-client.Updates():
×
UNCOV
350
                        newState := e.(rpcState)
×
UNCOV
351

×
UNCOV
352
                        // Ignore already sent state.
×
UNCOV
353
                        if newState == state {
×
354
                                continue
×
355
                        }
356

UNCOV
357
                        state = newState
×
UNCOV
358
                        err := sendStateUpdate(state)
×
UNCOV
359
                        if err != nil {
×
UNCOV
360
                                return err
×
UNCOV
361
                        }
×
362

363
                // The response stream's context for whatever reason has been
364
                // closed. If context is closed by an exceeded deadline we will
365
                // return an error.
UNCOV
366
                case <-stream.Context().Done():
×
UNCOV
367
                        if errors.Is(stream.Context().Err(), context.Canceled) {
×
UNCOV
368
                                return nil
×
UNCOV
369
                        }
×
370
                        return stream.Context().Err()
×
371

372
                case <-r.quit:
×
373
                        return fmt.Errorf("server exiting")
×
374
                }
375
        }
376
}
377

378
// GetState returns the current wallet state.
379
func (r *InterceptorChain) GetState(_ context.Context,
380
        _ *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) {
×
381

×
382
        r.RLock()
×
383
        state := r.state
×
384
        r.RUnlock()
×
385

×
386
        walletState, err := rpcStateToWalletState(state)
×
387
        if err != nil {
×
388
                return nil, err
×
389
        }
×
390

391
        return &lnrpc.GetStateResponse{
×
392
                State: walletState,
×
393
        }, nil
×
394
}
395

396
// AddMacaroonService adds a macaroon service to the interceptor. After this is
397
// done every RPC call made will have to pass a valid macaroon to be accepted.
UNCOV
398
func (r *InterceptorChain) AddMacaroonService(svc *macaroons.Service) {
×
UNCOV
399
        r.Lock()
×
UNCOV
400
        defer r.Unlock()
×
UNCOV
401

×
UNCOV
402
        r.svc = svc
×
UNCOV
403
}
×
404

405
// MacaroonService returns the currently registered macaroon service. This might
406
// be nil if none was registered (yet).
UNCOV
407
func (r *InterceptorChain) MacaroonService() *macaroons.Service {
×
UNCOV
408
        r.RLock()
×
UNCOV
409
        defer r.RUnlock()
×
UNCOV
410

×
UNCOV
411
        return r.svc
×
UNCOV
412
}
×
413

414
// AddPermission adds a new macaroon rule for the given method.
UNCOV
415
func (r *InterceptorChain) AddPermission(method string, ops []bakery.Op) error {
×
UNCOV
416
        r.Lock()
×
UNCOV
417
        defer r.Unlock()
×
UNCOV
418

×
UNCOV
419
        if _, ok := r.permissionMap[method]; ok {
×
420
                return fmt.Errorf("detected duplicate macaroon constraints "+
×
421
                        "for path: %v", method)
×
422
        }
×
423

UNCOV
424
        r.permissionMap[method] = ops
×
UNCOV
425
        return nil
×
426
}
427

428
// Permissions returns the current set of macaroon permissions.
UNCOV
429
func (r *InterceptorChain) Permissions() map[string][]bakery.Op {
×
UNCOV
430
        r.RLock()
×
UNCOV
431
        defer r.RUnlock()
×
UNCOV
432

×
UNCOV
433
        // Make a copy under the read lock to avoid races.
×
UNCOV
434
        c := make(map[string][]bakery.Op)
×
UNCOV
435
        for k, v := range r.permissionMap {
×
UNCOV
436
                s := make([]bakery.Op, len(v))
×
UNCOV
437
                copy(s, v)
×
UNCOV
438
                c[k] = s
×
UNCOV
439
        }
×
440

UNCOV
441
        return c
×
442
}
443

444
// RegisterMiddleware registers a new middleware that will handle request/
445
// response interception for all RPC messages that are initiated with a custom
446
// macaroon caveat. The name of the custom caveat a middleware is handling is
447
// also its unique identifier. Only one middleware can be registered for each
448
// custom caveat.
UNCOV
449
func (r *InterceptorChain) RegisterMiddleware(mw *MiddlewareHandler) error {
×
UNCOV
450
        r.Lock()
×
UNCOV
451
        defer r.Unlock()
×
UNCOV
452

×
UNCOV
453
        // The name of the middleware is the unique identifier.
×
UNCOV
454
        _, ok := r.registeredMiddlewareNames[mw.middlewareName]
×
UNCOV
455
        if ok {
×
456
                return fmt.Errorf("a middleware with the name '%s' is already "+
×
457
                        "registered", mw.middlewareName)
×
458
        }
×
459

460
        // For now, we only want one middleware per custom caveat name. If we
461
        // allowed multiple middlewares handling the same caveat there would be
462
        // a need for extra call chaining logic, and they could overwrite each
463
        // other's responses.
UNCOV
464
        for _, middleware := range r.registeredMiddleware {
×
465
                if middleware.customCaveatName == mw.customCaveatName {
×
466
                        return fmt.Errorf("a middleware is already registered "+
×
467
                                "for the custom caveat name '%s': %v",
×
468
                                mw.customCaveatName, middleware.middlewareName)
×
469
                }
×
470
        }
471

UNCOV
472
        r.registeredMiddleware = append(r.registeredMiddleware, mw)
×
UNCOV
473
        index := len(r.registeredMiddleware) - 1
×
UNCOV
474
        r.registeredMiddlewareNames[mw.middlewareName] = index
×
UNCOV
475

×
UNCOV
476
        return nil
×
477
}
478

479
// RemoveMiddleware removes the middleware that handles the given custom caveat
480
// name.
UNCOV
481
func (r *InterceptorChain) RemoveMiddleware(middlewareName string) {
×
UNCOV
482
        r.Lock()
×
UNCOV
483
        defer r.Unlock()
×
UNCOV
484

×
UNCOV
485
        log.Debugf("Removing middleware %s", middlewareName)
×
UNCOV
486

×
UNCOV
487
        index, ok := r.registeredMiddlewareNames[middlewareName]
×
UNCOV
488
        if !ok {
×
489
                return
×
490
        }
×
UNCOV
491
        delete(r.registeredMiddlewareNames, middlewareName)
×
UNCOV
492

×
UNCOV
493
        r.registeredMiddleware = append(
×
UNCOV
494
                r.registeredMiddleware[:index],
×
UNCOV
495
                r.registeredMiddleware[index+1:]...,
×
UNCOV
496
        )
×
UNCOV
497

×
UNCOV
498
        // Re-initialise the middleware look-up map with the updated indexes.
×
UNCOV
499
        r.registeredMiddlewareNames = make(map[string]int)
×
UNCOV
500
        for i, mw := range r.registeredMiddleware {
×
501
                r.registeredMiddlewareNames[mw.middlewareName] = i
×
502
        }
×
503
}
504

505
// CustomCaveatSupported makes sure a middleware that handles the given custom
506
// caveat name is registered. If none is, an error is returned, signalling to
507
// the macaroon bakery and its validator to reject macaroons that have a custom
508
// caveat with that name.
509
//
510
// NOTE: This method is part of the macaroons.CustomCaveatAcceptor interface.
UNCOV
511
func (r *InterceptorChain) CustomCaveatSupported(customCaveatName string) error {
×
UNCOV
512
        r.RLock()
×
UNCOV
513
        defer r.RUnlock()
×
UNCOV
514

×
UNCOV
515
        // We only accept requests with a custom caveat if we also have a
×
UNCOV
516
        // middleware registered that handles that custom caveat. That is
×
UNCOV
517
        // crucial for security! Otherwise a request with an encumbered (=has
×
UNCOV
518
        // restricted permissions based upon the custom caveat condition)
×
UNCOV
519
        // macaroon would not be validated against the limitations that the
×
UNCOV
520
        // custom caveat implicate. Since the map is keyed by the _name_ of the
×
UNCOV
521
        // middleware, we need to loop through all of them to see if one has
×
UNCOV
522
        // the given custom macaroon caveat name.
×
UNCOV
523
        for _, middleware := range r.registeredMiddleware {
×
UNCOV
524
                if middleware.customCaveatName == customCaveatName {
×
UNCOV
525
                        return nil
×
UNCOV
526
                }
×
527
        }
528

UNCOV
529
        return fmt.Errorf("cannot accept macaroon with custom caveat '%s', "+
×
UNCOV
530
                "no middleware registered to handle it", customCaveatName)
×
531
}
532

533
// CreateServerOpts creates the GRPC server options that can be added to a GRPC
534
// server in order to add this InterceptorChain.
UNCOV
535
func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption {
×
UNCOV
536
        var unaryInterceptors []grpc.UnaryServerInterceptor
×
UNCOV
537
        var strmInterceptors []grpc.StreamServerInterceptor
×
UNCOV
538

×
UNCOV
539
        // The first interceptors we'll add to the chain is our logging
×
UNCOV
540
        // interceptors, so we can automatically log all errors that happen
×
UNCOV
541
        // during RPC calls.
×
UNCOV
542
        unaryInterceptors = append(
×
UNCOV
543
                unaryInterceptors, errorLogUnaryServerInterceptor(r.rpcsLog),
×
UNCOV
544
        )
×
UNCOV
545
        strmInterceptors = append(
×
UNCOV
546
                strmInterceptors, errorLogStreamServerInterceptor(r.rpcsLog),
×
UNCOV
547
        )
×
UNCOV
548

×
UNCOV
549
        // Next we'll add our RPC state check interceptors, that will check
×
UNCOV
550
        // whether the attempted call is allowed in the current state.
×
UNCOV
551
        unaryInterceptors = append(
×
UNCOV
552
                unaryInterceptors, r.rpcStateUnaryServerInterceptor(),
×
UNCOV
553
        )
×
UNCOV
554
        strmInterceptors = append(
×
UNCOV
555
                strmInterceptors, r.rpcStateStreamServerInterceptor(),
×
UNCOV
556
        )
×
UNCOV
557

×
UNCOV
558
        // We'll add the macaroon interceptors. If macaroons aren't disabled,
×
UNCOV
559
        // then these interceptors will enforce macaroon authentication.
×
UNCOV
560
        unaryInterceptors = append(
×
UNCOV
561
                unaryInterceptors, r.MacaroonUnaryServerInterceptor(),
×
UNCOV
562
        )
×
UNCOV
563
        strmInterceptors = append(
×
UNCOV
564
                strmInterceptors, r.MacaroonStreamServerInterceptor(),
×
UNCOV
565
        )
×
UNCOV
566

×
UNCOV
567
        // Next, we'll add the interceptors for our custom macaroon caveat based
×
UNCOV
568
        // middleware.
×
UNCOV
569
        unaryInterceptors = append(
×
UNCOV
570
                unaryInterceptors, r.middlewareUnaryServerInterceptor(),
×
UNCOV
571
        )
×
UNCOV
572
        strmInterceptors = append(
×
UNCOV
573
                strmInterceptors, r.middlewareStreamServerInterceptor(),
×
UNCOV
574
        )
×
UNCOV
575

×
UNCOV
576
        // Get interceptors for Prometheus to gather gRPC performance metrics.
×
UNCOV
577
        // If monitoring is disabled, GetPromInterceptors() will return empty
×
UNCOV
578
        // slices.
×
UNCOV
579
        promUnaryInterceptors, promStrmInterceptors :=
×
UNCOV
580
                monitoring.GetPromInterceptors()
×
UNCOV
581

×
UNCOV
582
        // Concatenate the slices of unary and stream interceptors respectively.
×
UNCOV
583
        unaryInterceptors = append(unaryInterceptors, promUnaryInterceptors...)
×
UNCOV
584
        strmInterceptors = append(strmInterceptors, promStrmInterceptors...)
×
UNCOV
585

×
UNCOV
586
        // Create server options from the interceptors we just set up.
×
UNCOV
587
        chainedUnary := grpc_middleware.WithUnaryServerChain(
×
UNCOV
588
                unaryInterceptors...,
×
UNCOV
589
        )
×
UNCOV
590
        chainedStream := grpc_middleware.WithStreamServerChain(
×
UNCOV
591
                strmInterceptors...,
×
UNCOV
592
        )
×
UNCOV
593
        serverOpts := []grpc.ServerOption{chainedUnary, chainedStream}
×
UNCOV
594

×
UNCOV
595
        return serverOpts
×
UNCOV
596
}
×
597

598
// errorLogUnaryServerInterceptor is a simple UnaryServerInterceptor that will
599
// automatically log any errors that occur when serving a client's unary
600
// request.
UNCOV
601
func errorLogUnaryServerInterceptor(logger btclog.Logger) grpc.UnaryServerInterceptor {
×
UNCOV
602
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
×
UNCOV
603
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
604

×
UNCOV
605
                resp, err := handler(ctx, req)
×
UNCOV
606
                if err != nil {
×
UNCOV
607
                        // TODO(roasbeef): also log request details?
×
UNCOV
608
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
×
UNCOV
609
                }
×
610

UNCOV
611
                return resp, err
×
612
        }
613
}
614

615
// errorLogStreamServerInterceptor is a simple StreamServerInterceptor that
616
// will log any errors that occur while processing a client or server streaming
617
// RPC.
UNCOV
618
func errorLogStreamServerInterceptor(logger btclog.Logger) grpc.StreamServerInterceptor {
×
UNCOV
619
        return func(srv interface{}, ss grpc.ServerStream,
×
UNCOV
620
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
UNCOV
621

×
UNCOV
622
                err := handler(srv, ss)
×
UNCOV
623
                if err != nil {
×
UNCOV
624
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
×
UNCOV
625
                }
×
626

UNCOV
627
                return err
×
628
        }
629
}
630

631
// checkMacaroon validates that the context contains the macaroon needed to
632
// invoke the given RPC method.
633
func (r *InterceptorChain) checkMacaroon(ctx context.Context,
UNCOV
634
        fullMethod string) error {
×
UNCOV
635

×
UNCOV
636
        // If noMacaroons is set, we'll always allow the call.
×
UNCOV
637
        if r.noMacaroons {
×
638
                return nil
×
639
        }
×
640

641
        // Check whether the method is whitelisted, if so we'll allow it
642
        // regardless of macaroons.
UNCOV
643
        _, ok := macaroonWhitelist[fullMethod]
×
UNCOV
644
        if ok {
×
UNCOV
645
                return nil
×
UNCOV
646
        }
×
647

UNCOV
648
        r.RLock()
×
UNCOV
649
        svc := r.svc
×
UNCOV
650
        r.RUnlock()
×
UNCOV
651

×
UNCOV
652
        // If the macaroon service is not yet active, we cannot allow
×
UNCOV
653
        // the call.
×
UNCOV
654
        if svc == nil {
×
655
                return fmt.Errorf("unable to determine macaroon permissions")
×
656
        }
×
657

UNCOV
658
        r.RLock()
×
UNCOV
659
        uriPermissions, ok := r.permissionMap[fullMethod]
×
UNCOV
660
        r.RUnlock()
×
UNCOV
661
        if !ok {
×
662
                return fmt.Errorf("%s: unknown permissions required for method",
×
663
                        fullMethod)
×
664
        }
×
665

666
        // Find out if there is an external validator registered for
667
        // this method. Fall back to the internal one if there isn't.
UNCOV
668
        validator, ok := svc.ExternalValidators[fullMethod]
×
UNCOV
669
        if !ok {
×
UNCOV
670
                validator = svc
×
UNCOV
671
        }
×
672

673
        // Now that we know what validator to use, let it do its work.
UNCOV
674
        return validator.ValidateMacaroon(ctx, uriPermissions, fullMethod)
×
675
}
676

677
// MacaroonUnaryServerInterceptor is a GRPC interceptor that checks whether the
678
// request is authorized by the included macaroons.
UNCOV
679
func (r *InterceptorChain) MacaroonUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
UNCOV
680
        return func(ctx context.Context, req interface{},
×
UNCOV
681
                info *grpc.UnaryServerInfo,
×
UNCOV
682
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
683

×
UNCOV
684
                // Check macaroons.
×
UNCOV
685
                if err := r.checkMacaroon(ctx, info.FullMethod); err != nil {
×
UNCOV
686
                        return nil, err
×
UNCOV
687
                }
×
688

UNCOV
689
                return handler(ctx, req)
×
690
        }
691
}
692

693
// MacaroonStreamServerInterceptor is a GRPC interceptor that checks whether
694
// the request is authorized by the included macaroons.
UNCOV
695
func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerInterceptor {
×
UNCOV
696
        return func(srv interface{}, ss grpc.ServerStream,
×
UNCOV
697
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
UNCOV
698

×
UNCOV
699
                // Check macaroons.
×
UNCOV
700
                err := r.checkMacaroon(ss.Context(), info.FullMethod)
×
UNCOV
701
                if err != nil {
×
702
                        return err
×
703
                }
×
704

UNCOV
705
                return handler(srv, ss)
×
706
        }
707
}
708

709
// checkRPCState checks whether a call to the given server is allowed in the
710
// current RPC state.
UNCOV
711
func (r *InterceptorChain) checkRPCState(srv interface{}) error {
×
UNCOV
712
        // The StateService is being accessed, we allow the call regardless of
×
UNCOV
713
        // the current state.
×
UNCOV
714
        _, ok := srv.(lnrpc.StateServer)
×
UNCOV
715
        if ok {
×
UNCOV
716
                return nil
×
UNCOV
717
        }
×
718

UNCOV
719
        r.RLock()
×
UNCOV
720
        state := r.state
×
UNCOV
721
        r.RUnlock()
×
UNCOV
722

×
UNCOV
723
        switch state {
×
724
        // Do not accept any RPC calls (unless to the state service) until LND
725
        // has not started.
726
        case waitingToStart:
×
727
                return ErrWaitingToStart
×
728

729
        // If the wallet does not exists, only calls to the WalletUnlocker are
730
        // accepted.
UNCOV
731
        case walletNotCreated:
×
UNCOV
732
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
UNCOV
733
                if !ok {
×
734
                        return ErrNoWallet
×
735
                }
×
736

737
        // If the wallet is locked, only calls to the WalletUnlocker are
738
        // accepted.
UNCOV
739
        case walletLocked:
×
UNCOV
740
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
UNCOV
741
                if !ok {
×
742
                        return ErrWalletLocked
×
743
                }
×
744

745
        // If the wallet is unlocked, but the RPC not yet active, we reject.
746
        case walletUnlocked:
×
747
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
748
                if ok {
×
749
                        return ErrWalletUnlocked
×
750
                }
×
751

752
                return ErrRPCStarting
×
753

754
        // If the RPC server or lnd server is active, we allow calls to any
755
        // service except the WalletUnlocker.
UNCOV
756
        case rpcActive, serverActive:
×
UNCOV
757
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
UNCOV
758
                if ok {
×
759
                        return ErrWalletUnlocked
×
760
                }
×
761

762
        default:
×
763
                return fmt.Errorf("unknown RPC state: %v", state)
×
764
        }
765

UNCOV
766
        return nil
×
767
}
768

769
// rpcStateUnaryServerInterceptor is a GRPC interceptor that checks whether
770
// calls to the given gGRPC server is allowed in the current rpc state.
UNCOV
771
func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
UNCOV
772
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
×
UNCOV
773
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
774

×
UNCOV
775
                r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
×
UNCOV
776

×
UNCOV
777
                if err := r.checkRPCState(info.Server); err != nil {
×
778
                        return nil, err
×
779
                }
×
780

UNCOV
781
                return handler(ctx, req)
×
782
        }
783
}
784

785
// rpcStateStreamServerInterceptor is a GRPC interceptor that checks whether
786
// calls to the given gGRPC server is allowed in the current rpc state.
UNCOV
787
func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerInterceptor {
×
UNCOV
788
        return func(srv interface{}, ss grpc.ServerStream,
×
UNCOV
789
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
UNCOV
790

×
UNCOV
791
                r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
×
UNCOV
792

×
UNCOV
793
                if err := r.checkRPCState(srv); err != nil {
×
794
                        return err
×
795
                }
×
796

UNCOV
797
                return handler(srv, ss)
×
798
        }
799
}
800

801
// middlewareUnaryServerInterceptor is a unary gRPC interceptor that intercepts
802
// all requests and responses that are sent with a macaroon containing a custom
803
// caveat condition that is handled by registered middleware.
UNCOV
804
func (r *InterceptorChain) middlewareUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
UNCOV
805
        return func(ctx context.Context,
×
UNCOV
806
                req interface{}, info *grpc.UnaryServerInfo,
×
UNCOV
807
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
808

×
UNCOV
809
                // Make sure we don't allow any requests through if one of the
×
UNCOV
810
                // mandatory middlewares is missing.
×
UNCOV
811
                fullMethod := info.FullMethod
×
UNCOV
812
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
×
813
                        return nil, err
×
814
                }
×
815

816
                // If there is no middleware registered, we don't need to
817
                // intercept anything.
UNCOV
818
                if !r.middlewareRegistered() {
×
UNCOV
819
                        return handler(ctx, req)
×
UNCOV
820
                }
×
821

UNCOV
822
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
×
UNCOV
823
                req, err := r.interceptMessage(
×
UNCOV
824
                        ctx, TypeRequest, requestID, false, info.FullMethod,
×
UNCOV
825
                        req,
×
UNCOV
826
                )
×
UNCOV
827
                if err != nil {
×
828
                        return nil, err
×
829
                }
×
830

831
                // Call the handler, which executes the request against lnd.
UNCOV
832
                lndResp, lndErr := handler(ctx, req)
×
UNCOV
833
                if lndErr != nil {
×
UNCOV
834
                        // The call to lnd ended in an error and not a normal
×
UNCOV
835
                        // proto message response. Send the error to the
×
UNCOV
836
                        // interceptor as well to inform about the abnormal
×
UNCOV
837
                        // termination of the stream and to give the option to
×
UNCOV
838
                        // replace the error message with a custom one.
×
UNCOV
839
                        replacedErr, err := r.interceptMessage(
×
UNCOV
840
                                ctx, TypeResponse, requestID, false,
×
UNCOV
841
                                info.FullMethod, lndErr,
×
UNCOV
842
                        )
×
UNCOV
843
                        if err != nil {
×
844
                                return nil, err
×
845
                        }
×
UNCOV
846
                        return lndResp, replacedErr.(error)
×
847
                }
848

UNCOV
849
                return r.interceptMessage(
×
UNCOV
850
                        ctx, TypeResponse, requestID, false, info.FullMethod,
×
UNCOV
851
                        lndResp,
×
UNCOV
852
                )
×
853
        }
854
}
855

856
// middlewareStreamServerInterceptor is a streaming gRPC interceptor that
857
// intercepts all requests and responses that are sent with a macaroon
858
// containing a custom caveat condition that is handled by registered
859
// middleware.
UNCOV
860
func (r *InterceptorChain) middlewareStreamServerInterceptor() grpc.StreamServerInterceptor {
×
UNCOV
861
        return func(srv interface{},
×
UNCOV
862
                ss grpc.ServerStream, info *grpc.StreamServerInfo,
×
UNCOV
863
                handler grpc.StreamHandler) error {
×
UNCOV
864

×
UNCOV
865
                // Don't intercept the interceptor itself which is a streaming
×
UNCOV
866
                // RPC too!
×
UNCOV
867
                fullMethod := info.FullMethod
×
UNCOV
868
                if fullMethod == lnrpc.RegisterRPCMiddlewareURI {
×
UNCOV
869
                        return handler(srv, ss)
×
UNCOV
870
                }
×
871

872
                // Make sure we don't allow any requests through if one of the
873
                // mandatory middlewares is missing. We add this check here to
874
                // make sure the middleware registration itself can still be
875
                // called.
UNCOV
876
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
×
877
                        return err
×
878
                }
×
879

880
                // If there is no middleware registered, we don't need to
881
                // intercept anything.
UNCOV
882
                if !r.middlewareRegistered() {
×
UNCOV
883
                        return handler(srv, ss)
×
UNCOV
884
                }
×
885

886
                // To give the middleware a chance to accept or reject the
887
                // establishment of the stream itself (and not only when the
888
                // first message is sent on the stream), we send an intercept
889
                // request for the stream auth now:
UNCOV
890
                msg, err := NewStreamAuthInterceptionRequest(
×
UNCOV
891
                        ss.Context(), info.FullMethod,
×
UNCOV
892
                )
×
UNCOV
893
                if err != nil {
×
894
                        return err
×
895
                }
×
896

UNCOV
897
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
×
UNCOV
898
                err = r.acceptStream(requestID, msg)
×
UNCOV
899
                if err != nil {
×
900
                        return err
×
901
                }
×
902

UNCOV
903
                wrappedSS := &serverStreamWrapper{
×
UNCOV
904
                        ServerStream: ss,
×
UNCOV
905
                        requestID:    requestID,
×
UNCOV
906
                        fullMethod:   info.FullMethod,
×
UNCOV
907
                        interceptor:  r,
×
UNCOV
908
                }
×
UNCOV
909

×
UNCOV
910
                // Call the stream handler, which will block as long as the
×
UNCOV
911
                // stream is alive.
×
UNCOV
912
                lndErr := handler(srv, wrappedSS)
×
UNCOV
913
                if lndErr != nil {
×
UNCOV
914
                        // This is an error being returned from lnd. Send it to
×
UNCOV
915
                        // the interceptor as well to inform about the abnormal
×
UNCOV
916
                        // termination of the stream and to give the option to
×
UNCOV
917
                        // replace the error message with a custom one.
×
UNCOV
918
                        replacedErr, err := r.interceptMessage(
×
UNCOV
919
                                ss.Context(), TypeResponse, requestID,
×
UNCOV
920
                                true, info.FullMethod, lndErr,
×
UNCOV
921
                        )
×
UNCOV
922
                        if err != nil {
×
923
                                return err
×
924
                        }
×
925

UNCOV
926
                        return replacedErr.(error)
×
927
                }
928

929
                // Normal/successful termination of the stream.
UNCOV
930
                return nil
×
931
        }
932
}
933

934
// checkMandatoryMiddleware makes sure that each of the middlewares declared as
935
// mandatory is currently registered.
UNCOV
936
func (r *InterceptorChain) checkMandatoryMiddleware(fullMethod string) error {
×
UNCOV
937
        r.RLock()
×
UNCOV
938
        defer r.RUnlock()
×
UNCOV
939

×
UNCOV
940
        // Allow calls that are whitelisted for macaroons as well, otherwise we
×
UNCOV
941
        // get into impossible situations where the wallet is locked but the
×
UNCOV
942
        // unlock call is denied because the middleware isn't registered. But
×
UNCOV
943
        // the middleware cannot register itself because the wallet is locked.
×
UNCOV
944
        if _, ok := macaroonWhitelist[fullMethod]; ok {
×
UNCOV
945
                return nil
×
UNCOV
946
        }
×
947

948
        // Not a white listed call so make sure every mandatory middleware is
949
        // currently connected to lnd.
UNCOV
950
        for _, name := range r.mandatoryMiddleware {
×
951
                if _, ok := r.registeredMiddlewareNames[name]; !ok {
×
952
                        return fmt.Errorf("mandatory middleware '%s' is "+
×
953
                                "currently not registered, not allowing any "+
×
954
                                "RPC calls", name)
×
955
                }
×
956
        }
957

UNCOV
958
        return nil
×
959
}
960

961
// middlewareRegistered returns true if there is at least one middleware
962
// currently registered.
UNCOV
963
func (r *InterceptorChain) middlewareRegistered() bool {
×
UNCOV
964
        r.RLock()
×
UNCOV
965
        defer r.RUnlock()
×
UNCOV
966

×
UNCOV
967
        return len(r.registeredMiddleware) > 0
×
UNCOV
968
}
×
969

970
// acceptStream sends an intercept request to all middlewares that have
971
// registered for it. This means either a middleware has requested read-only
972
// access or the request actually has a macaroon with a caveat the middleware
973
// registered for.
974
func (r *InterceptorChain) acceptStream(requestID uint64,
UNCOV
975
        msg *InterceptionRequest) error {
×
UNCOV
976

×
UNCOV
977
        r.RLock()
×
UNCOV
978
        defer r.RUnlock()
×
UNCOV
979

×
UNCOV
980
        for _, middleware := range r.registeredMiddleware {
×
UNCOV
981
                // If there is a custom caveat in the macaroon, make sure the
×
UNCOV
982
                // middleware registered for it. Or if a middleware registered
×
UNCOV
983
                // for read-only mode, it also gets the request.
×
UNCOV
984
                hasCustomCaveat := macaroons.HasCustomCaveat(
×
UNCOV
985
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
986
                )
×
UNCOV
987
                if !hasCustomCaveat && !middleware.readOnly {
×
988
                        continue
×
989
                }
990

UNCOV
991
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
×
UNCOV
992
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
993
                )
×
UNCOV
994

×
UNCOV
995
                resp, err := middleware.intercept(requestID, msg)
×
UNCOV
996

×
UNCOV
997
                // Error during interception itself.
×
UNCOV
998
                if err != nil {
×
999
                        return err
×
1000
                }
×
1001

1002
                // Error returned from middleware client.
UNCOV
1003
                if resp.err != nil {
×
1004
                        return resp.err
×
1005
                }
×
1006
        }
1007

UNCOV
1008
        return nil
×
1009
}
1010

1011
// interceptMessage sends out an intercept request for an RPC response. Since
1012
// middleware that hasn't registered for the read-only mode has the option to
1013
// overwrite/replace the message, this needs to be handled differently than the
1014
// auth path above.
1015
func (r *InterceptorChain) interceptMessage(ctx context.Context,
1016
        interceptType InterceptType, requestID uint64, isStream bool,
UNCOV
1017
        fullMethod string, m interface{}) (interface{}, error) {
×
UNCOV
1018

×
UNCOV
1019
        r.RLock()
×
UNCOV
1020
        defer r.RUnlock()
×
UNCOV
1021

×
UNCOV
1022
        currentMessage := m
×
UNCOV
1023
        for _, middleware := range r.registeredMiddleware {
×
UNCOV
1024
                msg, err := NewMessageInterceptionRequest(
×
UNCOV
1025
                        ctx, interceptType, isStream, fullMethod,
×
UNCOV
1026
                        currentMessage,
×
UNCOV
1027
                )
×
UNCOV
1028
                if err != nil {
×
1029
                        return nil, err
×
1030
                }
×
1031

1032
                // If there is a custom caveat in the macaroon, make sure the
1033
                // middleware registered for it. Or if a middleware registered
1034
                // for read-only mode, it also gets the request.
UNCOV
1035
                hasCustomCaveat := macaroons.HasCustomCaveat(
×
UNCOV
1036
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
1037
                )
×
UNCOV
1038
                if !hasCustomCaveat && !middleware.readOnly {
×
UNCOV
1039
                        continue
×
1040
                }
1041

UNCOV
1042
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
×
UNCOV
1043
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
1044
                )
×
UNCOV
1045

×
UNCOV
1046
                resp, err := middleware.intercept(requestID, msg)
×
UNCOV
1047

×
UNCOV
1048
                // Error during interception itself.
×
UNCOV
1049
                if err != nil {
×
1050
                        return nil, err
×
1051
                }
×
1052

1053
                // Error returned from middleware client.
UNCOV
1054
                if resp.err != nil {
×
1055
                        return nil, resp.err
×
1056
                }
×
1057

1058
                // The message was replaced, make sure the next middleware in
1059
                // line receives the updated message.
UNCOV
1060
                if !middleware.readOnly && resp.replace {
×
UNCOV
1061
                        currentMessage = resp.replacement
×
UNCOV
1062
                }
×
1063
        }
1064

UNCOV
1065
        return currentMessage, nil
×
1066
}
1067

1068
// serverStreamWrapper is a struct that wraps a server stream in a way that all
1069
// requests and responses can be intercepted individually.
1070
type serverStreamWrapper struct {
1071
        // ServerStream is the stream that's being wrapped.
1072
        grpc.ServerStream
1073

1074
        requestID uint64
1075

1076
        fullMethod string
1077

1078
        interceptor *InterceptorChain
1079
}
1080

1081
// SendMsg is called when lnd sends a message to the client. This is wrapped to
1082
// intercept streaming RPC responses.
UNCOV
1083
func (w *serverStreamWrapper) SendMsg(m interface{}) error {
×
UNCOV
1084
        newMsg, err := w.interceptor.interceptMessage(
×
UNCOV
1085
                w.ServerStream.Context(), TypeResponse, w.requestID, true,
×
UNCOV
1086
                w.fullMethod, m,
×
UNCOV
1087
        )
×
UNCOV
1088
        if err != nil {
×
1089
                return err
×
1090
        }
×
1091

UNCOV
1092
        return w.ServerStream.SendMsg(newMsg)
×
1093
}
1094

1095
// RecvMsg is called when lnd wants to receive a message from the client. This
1096
// is wrapped to intercept streaming RPC requests.
UNCOV
1097
func (w *serverStreamWrapper) RecvMsg(m interface{}) error {
×
UNCOV
1098
        err := w.ServerStream.RecvMsg(m)
×
UNCOV
1099
        if err != nil {
×
1100
                return err
×
1101
        }
×
1102

UNCOV
1103
        req, err := w.interceptor.interceptMessage(
×
UNCOV
1104
                w.ServerStream.Context(), TypeRequest, w.requestID, true,
×
UNCOV
1105
                w.fullMethod, m,
×
UNCOV
1106
        )
×
UNCOV
1107
        if err != nil {
×
1108
                return err
×
1109
        }
×
1110

UNCOV
1111
        return replaceProtoMsg(m, req)
×
1112
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc