• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13536249039

26 Feb 2025 03:42AM UTC coverage: 57.462% (-1.4%) from 58.835%
13536249039

Pull #8453

github

Roasbeef
peer: update chooseDeliveryScript to gen script if needed

In this commit, we update `chooseDeliveryScript` to generate a new
script if needed. This allows us to fold in a few other lines that
always followed this function into this expanded function.

The tests have been updated accordingly.
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

275 of 1318 new or added lines in 22 files covered. (20.86%)

19521 existing lines in 257 files now uncovered.

103858 of 180741 relevant lines covered (57.46%)

24750.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/rpcperms/interceptor.go
1
package rpcperms
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btclog/v2"
11
        grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
12
        "github.com/lightningnetwork/lnd/lnrpc"
13
        "github.com/lightningnetwork/lnd/macaroons"
14
        "github.com/lightningnetwork/lnd/monitoring"
15
        "github.com/lightningnetwork/lnd/subscribe"
16
        "google.golang.org/grpc"
17
        "gopkg.in/macaroon-bakery.v2/bakery"
18
)
19

20
// rpcState is an enum that we use to keep track of the current RPC service
21
// state. This will transition as we go from startup to unlocking the wallet,
22
// and finally fully active.
23
type rpcState uint8
24

25
const (
26
        // waitingToStart indicates that we're at the beginning of the startup
27
        // process. In a cluster environment this may mean that we're waiting to
28
        // become the leader in which case RPC calls will be disabled until
29
        // this instance has been elected as leader.
30
        waitingToStart rpcState = iota
31

32
        // walletNotCreated is the starting state if the RPC server is active,
33
        // but the wallet is not yet created. In this state we'll only allow
34
        // calls to the WalletUnlockerService.
35
        walletNotCreated
36

37
        // walletLocked indicates the RPC server is active, but the wallet is
38
        // locked. In this state we'll only allow calls to the
39
        // WalletUnlockerService.
40
        walletLocked
41

42
        // walletUnlocked means that the wallet has been unlocked, but the full
43
        // RPC server is not yet ready.
44
        walletUnlocked
45

46
        // rpcActive means that the RPC server is ready to accept calls.
47
        rpcActive
48

49
        // serverActive means that the lnd server is ready to accept calls.
50
        serverActive
51
)
52

53
var (
54
        // ErrWaitingToStart is returned if LND is still waiting to start,
55
        // possibly blocked until elected as the leader.
56
        ErrWaitingToStart = fmt.Errorf("waiting to start, RPC services not " +
57
                "available")
58

59
        // ErrNoWallet is returned if the wallet does not exist.
60
        ErrNoWallet = fmt.Errorf("wallet not created, create one to enable " +
61
                "full RPC access")
62

63
        // ErrWalletLocked is returned if the wallet is locked and any service
64
        // other than the WalletUnlocker is called.
65
        ErrWalletLocked = fmt.Errorf("wallet locked, unlock it to enable " +
66
                "full RPC access")
67

68
        // ErrWalletUnlocked is returned if the WalletUnlocker service is
69
        // called when the wallet already has been unlocked.
70
        ErrWalletUnlocked = fmt.Errorf("wallet already unlocked, " +
71
                "WalletUnlocker service is no longer available")
72

73
        // ErrRPCStarting is returned if the wallet has been unlocked but the
74
        // RPC server is not yet ready to accept calls.
75
        ErrRPCStarting = fmt.Errorf("the RPC server is in the process of " +
76
                "starting up, but not yet ready to accept calls")
77

78
        // macaroonWhitelist defines methods that we don't require macaroons to
79
        // access. We also allow these methods to be called even if not all
80
        // mandatory middlewares are registered yet. If the wallet is locked
81
        // then a middleware cannot register itself, creating an impossible
82
        // situation. Also, a middleware might want to check the state of lnd
83
        // by calling the State service before it registers itself. So we also
84
        // need to exclude those calls from the mandatory middleware check.
85
        macaroonWhitelist = map[string]struct{}{
86
                // We allow all calls to the WalletUnlocker without macaroons.
87
                "/lnrpc.WalletUnlocker/GenSeed":        {},
88
                "/lnrpc.WalletUnlocker/InitWallet":     {},
89
                "/lnrpc.WalletUnlocker/UnlockWallet":   {},
90
                "/lnrpc.WalletUnlocker/ChangePassword": {},
91

92
                // The State service must be available at all times, even
93
                // before we can check macaroons, so we whitelist it.
94
                "/lnrpc.State/SubscribeState": {},
95
                "/lnrpc.State/GetState":       {},
96
        }
97
)
98

99
// InterceptorChain is a struct that can be added to the running GRPC server,
100
// intercepting API calls. This is useful for logging, enforcing permissions,
101
// supporting middleware etc. The following diagram shows the order of each
102
// interceptor in the chain and when exactly requests/responses are intercepted
103
// and forwarded to external middleware for approval/modification. Middleware in
104
// general can only intercept gRPC requests/responses that are sent by the
105
// client with a macaroon that contains a custom caveat that is supported by one
106
// of the registered middlewares.
107
//
108
//            |
109
//            | gRPC request from client
110
//            |
111
//        +---v--------------------------------+
112
//        |   InterceptorChain                 |
113
//        +-+----------------------------------+
114
//          | Log Interceptor                  |
115
//          +----------------------------------+
116
//          | RPC State Interceptor            |
117
//          +----------------------------------+
118
//          | Macaroon Interceptor             |
119
//          +----------------------------------+--------> +---------------------+
120
//          | RPC Macaroon Middleware Handler  |<-------- | External Middleware |
121
//          +----------------------------------+          |   - modify request |
122
//          | Prometheus Interceptor           |          +---------------------+
123
//          +-+--------------------------------+
124
//            | validated gRPC request from client
125
//        +---v--------------------------------+
126
//        |   main gRPC server                 |
127
//        +---+--------------------------------+
128
//            |
129
//            | original gRPC request to client
130
//            |
131
//        +---v--------------------------------+--------> +---------------------+
132
//        |   RPC Macaroon Middleware Handler  |<-------- | External Middleware |
133
//        +---+--------------------------------+          |   - modify response |
134
//            |                                           +---------------------+
135
//            | edited gRPC request to client
136
//            v
137
type InterceptorChain struct {
138
        // lastRequestID is the ID of the last gRPC request or stream that was
139
        // intercepted by the middleware interceptor.
140
        //
141
        // NOTE: Must be used atomically!
142
        lastRequestID uint64
143

144
        // Required by the grpc-gateway/v2 library for forward compatibility.
145
        lnrpc.UnimplementedStateServer
146

147
        started sync.Once
148
        stopped sync.Once
149

150
        // state is the current RPC state of our RPC server.
151
        state rpcState
152

153
        // ntfnServer is a subscription server we use to notify clients of the
154
        // State service when the state changes.
155
        ntfnServer *subscribe.Server
156

157
        // noMacaroons should be set true if we don't want to check macaroons.
158
        noMacaroons bool
159

160
        // svc is the macaroon service used to enforce permissions in case
161
        // macaroons are used.
162
        svc *macaroons.Service
163

164
        // permissionMap is the permissions to enforce if macaroons are used.
165
        permissionMap map[string][]bakery.Op
166

167
        // rpcsLog is the logger used to log calls to the RPCs intercepted.
168
        rpcsLog btclog.Logger
169

170
        // registeredMiddleware is a slice of all macaroon permission based RPC
171
        // middleware clients that are currently registered. The
172
        // registeredMiddlewareNames can be used to find the index of a specific
173
        // interceptor within the registeredMiddleware slide using the name of
174
        // the interceptor as the key. The reason for using these two separate
175
        // structures is so that the order in which interceptors are run is
176
        // the same as the order in which they were registered.
177
        registeredMiddleware []*MiddlewareHandler
178

179
        // registeredMiddlewareNames is a map of registered middleware names
180
        // to the index at which they are stored in the registeredMiddleware
181
        // map.
182
        registeredMiddlewareNames map[string]int
183

184
        // mandatoryMiddleware is a list of all middleware that is considered to
185
        // be mandatory. If any of them is not registered then all RPC requests
186
        // (except for the macaroon white listed methods and the middleware
187
        // registration itself) are blocked. This is a security feature to make
188
        // sure that requests can't just go through unobserved/unaudited if a
189
        // middleware crashes.
190
        mandatoryMiddleware []string
191

192
        quit chan struct{}
193
        sync.RWMutex
194
}
195

196
// A compile time check to ensure that InterceptorChain fully implements the
197
// StateServer gRPC service.
198
var _ lnrpc.StateServer = (*InterceptorChain)(nil)
199

200
// NewInterceptorChain creates a new InterceptorChain.
201
func NewInterceptorChain(log btclog.Logger, noMacaroons bool,
UNCOV
202
        mandatoryMiddleware []string) *InterceptorChain {
×
UNCOV
203

×
UNCOV
204
        return &InterceptorChain{
×
UNCOV
205
                state:                     waitingToStart,
×
UNCOV
206
                ntfnServer:                subscribe.NewServer(),
×
UNCOV
207
                noMacaroons:               noMacaroons,
×
UNCOV
208
                permissionMap:             make(map[string][]bakery.Op),
×
UNCOV
209
                rpcsLog:                   log,
×
UNCOV
210
                registeredMiddlewareNames: make(map[string]int),
×
UNCOV
211
                mandatoryMiddleware:       mandatoryMiddleware,
×
UNCOV
212
                quit:                      make(chan struct{}),
×
UNCOV
213
        }
×
UNCOV
214
}
×
215

216
// Start starts the InterceptorChain, which is needed to start the state
217
// subscription server it powers.
UNCOV
218
func (r *InterceptorChain) Start() error {
×
UNCOV
219
        var err error
×
UNCOV
220
        r.started.Do(func() {
×
UNCOV
221
                err = r.ntfnServer.Start()
×
UNCOV
222
        })
×
223

UNCOV
224
        return err
×
225
}
226

227
// Stop stops the InterceptorChain and its internal state subscription server.
UNCOV
228
func (r *InterceptorChain) Stop() error {
×
UNCOV
229
        var err error
×
UNCOV
230
        r.stopped.Do(func() {
×
UNCOV
231
                close(r.quit)
×
UNCOV
232
                err = r.ntfnServer.Stop()
×
UNCOV
233
        })
×
234

UNCOV
235
        return err
×
236
}
237

238
// SetWalletNotCreated moves the RPC state from either waitingToStart to
239
// walletNotCreated.
UNCOV
240
func (r *InterceptorChain) SetWalletNotCreated() {
×
UNCOV
241
        r.Lock()
×
UNCOV
242
        defer r.Unlock()
×
UNCOV
243

×
UNCOV
244
        r.state = walletNotCreated
×
UNCOV
245
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
246
}
×
247

248
// SetWalletLocked moves the RPC state from either walletNotCreated to
249
// walletLocked.
UNCOV
250
func (r *InterceptorChain) SetWalletLocked() {
×
UNCOV
251
        r.Lock()
×
UNCOV
252
        defer r.Unlock()
×
UNCOV
253

×
UNCOV
254
        r.state = walletLocked
×
UNCOV
255
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
256
}
×
257

258
// SetWalletUnlocked moves the RPC state from either walletNotCreated or
259
// walletLocked to walletUnlocked.
UNCOV
260
func (r *InterceptorChain) SetWalletUnlocked() {
×
UNCOV
261
        r.Lock()
×
UNCOV
262
        defer r.Unlock()
×
UNCOV
263

×
UNCOV
264
        r.state = walletUnlocked
×
UNCOV
265
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
266
}
×
267

268
// SetRPCActive moves the RPC state from walletUnlocked to rpcActive.
UNCOV
269
func (r *InterceptorChain) SetRPCActive() {
×
UNCOV
270
        r.Lock()
×
UNCOV
271
        defer r.Unlock()
×
UNCOV
272

×
UNCOV
273
        r.state = rpcActive
×
UNCOV
274
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
275
}
×
276

277
// SetServerActive moves the RPC state from walletUnlocked to rpcActive.
UNCOV
278
func (r *InterceptorChain) SetServerActive() {
×
UNCOV
279
        r.Lock()
×
UNCOV
280
        defer r.Unlock()
×
UNCOV
281

×
UNCOV
282
        r.state = serverActive
×
UNCOV
283
        _ = r.ntfnServer.SendUpdate(r.state)
×
UNCOV
284
}
×
285

286
// rpcStateToWalletState converts rpcState to lnrpc.WalletState. Returns
287
// WAITING_TO_START and an error on conversion error.
UNCOV
288
func rpcStateToWalletState(state rpcState) (lnrpc.WalletState, error) {
×
UNCOV
289
        const defaultState = lnrpc.WalletState_WAITING_TO_START
×
UNCOV
290
        var walletState lnrpc.WalletState
×
UNCOV
291

×
UNCOV
292
        switch state {
×
UNCOV
293
        case waitingToStart:
×
UNCOV
294
                walletState = lnrpc.WalletState_WAITING_TO_START
×
UNCOV
295
        case walletNotCreated:
×
UNCOV
296
                walletState = lnrpc.WalletState_NON_EXISTING
×
UNCOV
297
        case walletLocked:
×
UNCOV
298
                walletState = lnrpc.WalletState_LOCKED
×
UNCOV
299
        case walletUnlocked:
×
UNCOV
300
                walletState = lnrpc.WalletState_UNLOCKED
×
UNCOV
301
        case rpcActive:
×
UNCOV
302
                walletState = lnrpc.WalletState_RPC_ACTIVE
×
UNCOV
303
        case serverActive:
×
UNCOV
304
                walletState = lnrpc.WalletState_SERVER_ACTIVE
×
305

306
        default:
×
307
                return defaultState, fmt.Errorf("unknown wallet state %v", state)
×
308
        }
309

UNCOV
310
        return walletState, nil
×
311
}
312

313
// SubscribeState subscribes to the state of the wallet. The current wallet
314
// state will always be delivered immediately.
315
//
316
// NOTE: Part of the StateService interface.
317
func (r *InterceptorChain) SubscribeState(_ *lnrpc.SubscribeStateRequest,
UNCOV
318
        stream lnrpc.State_SubscribeStateServer) error {
×
UNCOV
319

×
UNCOV
320
        sendStateUpdate := func(state rpcState) error {
×
UNCOV
321
                walletState, err := rpcStateToWalletState(state)
×
UNCOV
322
                if err != nil {
×
323
                        return err
×
324
                }
×
325

UNCOV
326
                return stream.Send(&lnrpc.SubscribeStateResponse{
×
UNCOV
327
                        State: walletState,
×
UNCOV
328
                })
×
329
        }
330

331
        // Subscribe to state updates.
UNCOV
332
        client, err := r.ntfnServer.Subscribe()
×
UNCOV
333
        if err != nil {
×
334
                return err
×
335
        }
×
UNCOV
336
        defer client.Cancel()
×
UNCOV
337

×
UNCOV
338
        // Always start by sending the current state.
×
UNCOV
339
        r.RLock()
×
UNCOV
340
        state := r.state
×
UNCOV
341
        r.RUnlock()
×
UNCOV
342

×
UNCOV
343
        if err := sendStateUpdate(state); err != nil {
×
344
                return err
×
345
        }
×
346

UNCOV
347
        for {
×
UNCOV
348
                select {
×
UNCOV
349
                case e := <-client.Updates():
×
UNCOV
350
                        newState := e.(rpcState)
×
UNCOV
351

×
UNCOV
352
                        // Ignore already sent state.
×
UNCOV
353
                        if newState == state {
×
354
                                continue
×
355
                        }
356

UNCOV
357
                        state = newState
×
UNCOV
358
                        err := sendStateUpdate(state)
×
UNCOV
359
                        if err != nil {
×
UNCOV
360
                                return err
×
UNCOV
361
                        }
×
362

363
                // The response stream's context for whatever reason has been
364
                // closed. If context is closed by an exceeded deadline we will
365
                // return an error.
UNCOV
366
                case <-stream.Context().Done():
×
UNCOV
367
                        if errors.Is(stream.Context().Err(), context.Canceled) {
×
UNCOV
368
                                return nil
×
UNCOV
369
                        }
×
370
                        return stream.Context().Err()
×
371

372
                case <-r.quit:
×
373
                        return fmt.Errorf("server exiting")
×
374
                }
375
        }
376
}
377

378
// GetState returns the current wallet state.
379
func (r *InterceptorChain) GetState(_ context.Context,
380
        _ *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) {
×
381

×
382
        r.RLock()
×
383
        state := r.state
×
384
        r.RUnlock()
×
385

×
386
        walletState, err := rpcStateToWalletState(state)
×
387
        if err != nil {
×
388
                return nil, err
×
389
        }
×
390

391
        return &lnrpc.GetStateResponse{
×
392
                State: walletState,
×
393
        }, nil
×
394
}
395

396
// AddMacaroonService adds a macaroon service to the interceptor. After this is
397
// done every RPC call made will have to pass a valid macaroon to be accepted.
UNCOV
398
func (r *InterceptorChain) AddMacaroonService(svc *macaroons.Service) {
×
UNCOV
399
        r.Lock()
×
UNCOV
400
        defer r.Unlock()
×
UNCOV
401

×
UNCOV
402
        r.svc = svc
×
UNCOV
403
}
×
404

405
// MacaroonService returns the currently registered macaroon service. This might
406
// be nil if none was registered (yet).
UNCOV
407
func (r *InterceptorChain) MacaroonService() *macaroons.Service {
×
UNCOV
408
        r.RLock()
×
UNCOV
409
        defer r.RUnlock()
×
UNCOV
410

×
UNCOV
411
        return r.svc
×
UNCOV
412
}
×
413

414
// AddPermission adds a new macaroon rule for the given method.
UNCOV
415
func (r *InterceptorChain) AddPermission(method string, ops []bakery.Op) error {
×
UNCOV
416
        r.Lock()
×
UNCOV
417
        defer r.Unlock()
×
UNCOV
418

×
UNCOV
419
        if _, ok := r.permissionMap[method]; ok {
×
420
                return fmt.Errorf("detected duplicate macaroon constraints "+
×
421
                        "for path: %v", method)
×
422
        }
×
423

UNCOV
424
        r.permissionMap[method] = ops
×
UNCOV
425
        return nil
×
426
}
427

428
// Permissions returns the current set of macaroon permissions.
UNCOV
429
func (r *InterceptorChain) Permissions() map[string][]bakery.Op {
×
UNCOV
430
        r.RLock()
×
UNCOV
431
        defer r.RUnlock()
×
UNCOV
432

×
UNCOV
433
        // Make a copy under the read lock to avoid races.
×
UNCOV
434
        c := make(map[string][]bakery.Op)
×
UNCOV
435
        for k, v := range r.permissionMap {
×
UNCOV
436
                s := make([]bakery.Op, len(v))
×
UNCOV
437
                copy(s, v)
×
UNCOV
438
                c[k] = s
×
UNCOV
439
        }
×
440

UNCOV
441
        return c
×
442
}
443

444
// RegisterMiddleware registers a new middleware that will handle request/
445
// response interception for all RPC messages that are initiated with a custom
446
// macaroon caveat. The name of the custom caveat a middleware is handling is
447
// also its unique identifier. Only one middleware can be registered for each
448
// custom caveat.
UNCOV
449
func (r *InterceptorChain) RegisterMiddleware(mw *MiddlewareHandler) error {
×
UNCOV
450
        r.Lock()
×
UNCOV
451
        defer r.Unlock()
×
UNCOV
452

×
UNCOV
453
        // The name of the middleware is the unique identifier.
×
UNCOV
454
        _, ok := r.registeredMiddlewareNames[mw.middlewareName]
×
UNCOV
455
        if ok {
×
456
                return fmt.Errorf("a middleware with the name '%s' is already "+
×
457
                        "registered", mw.middlewareName)
×
458
        }
×
459

460
        // For now, we only want one middleware per custom caveat name. If we
461
        // allowed multiple middlewares handling the same caveat there would be
462
        // a need for extra call chaining logic, and they could overwrite each
463
        // other's responses.
UNCOV
464
        for _, middleware := range r.registeredMiddleware {
×
465
                if middleware.customCaveatName == mw.customCaveatName {
×
466
                        return fmt.Errorf("a middleware is already registered "+
×
467
                                "for the custom caveat name '%s': %v",
×
468
                                mw.customCaveatName, middleware.middlewareName)
×
469
                }
×
470
        }
471

UNCOV
472
        r.registeredMiddleware = append(r.registeredMiddleware, mw)
×
UNCOV
473
        index := len(r.registeredMiddleware) - 1
×
UNCOV
474
        r.registeredMiddlewareNames[mw.middlewareName] = index
×
UNCOV
475

×
UNCOV
476
        return nil
×
477
}
478

479
// RemoveMiddleware removes the middleware that handles the given custom caveat
480
// name.
UNCOV
481
func (r *InterceptorChain) RemoveMiddleware(middlewareName string) {
×
UNCOV
482
        r.Lock()
×
UNCOV
483
        defer r.Unlock()
×
UNCOV
484

×
UNCOV
485
        log.Debugf("Removing middleware %s", middlewareName)
×
UNCOV
486

×
UNCOV
487
        index, ok := r.registeredMiddlewareNames[middlewareName]
×
UNCOV
488
        if !ok {
×
489
                return
×
490
        }
×
UNCOV
491
        delete(r.registeredMiddlewareNames, middlewareName)
×
UNCOV
492

×
UNCOV
493
        r.registeredMiddleware = append(
×
UNCOV
494
                r.registeredMiddleware[:index],
×
UNCOV
495
                r.registeredMiddleware[index+1:]...,
×
UNCOV
496
        )
×
UNCOV
497

×
UNCOV
498
        // Re-initialise the middleware look-up map with the updated indexes.
×
UNCOV
499
        r.registeredMiddlewareNames = make(map[string]int)
×
UNCOV
500
        for i, mw := range r.registeredMiddleware {
×
501
                r.registeredMiddlewareNames[mw.middlewareName] = i
×
502
        }
×
503
}
504

505
// CustomCaveatSupported makes sure a middleware that handles the given custom
506
// caveat name is registered. If none is, an error is returned, signalling to
507
// the macaroon bakery and its validator to reject macaroons that have a custom
508
// caveat with that name.
509
//
510
// NOTE: This method is part of the macaroons.CustomCaveatAcceptor interface.
UNCOV
511
func (r *InterceptorChain) CustomCaveatSupported(customCaveatName string) error {
×
UNCOV
512
        r.RLock()
×
UNCOV
513
        defer r.RUnlock()
×
UNCOV
514

×
UNCOV
515
        // We only accept requests with a custom caveat if we also have a
×
UNCOV
516
        // middleware registered that handles that custom caveat. That is
×
UNCOV
517
        // crucial for security! Otherwise a request with an encumbered (=has
×
UNCOV
518
        // restricted permissions based upon the custom caveat condition)
×
UNCOV
519
        // macaroon would not be validated against the limitations that the
×
UNCOV
520
        // custom caveat implicate. Since the map is keyed by the _name_ of the
×
UNCOV
521
        // middleware, we need to loop through all of them to see if one has
×
UNCOV
522
        // the given custom macaroon caveat name.
×
UNCOV
523
        for _, middleware := range r.registeredMiddleware {
×
UNCOV
524
                if middleware.customCaveatName == customCaveatName {
×
UNCOV
525
                        return nil
×
UNCOV
526
                }
×
527
        }
528

UNCOV
529
        return fmt.Errorf("cannot accept macaroon with custom caveat '%s', "+
×
UNCOV
530
                "no middleware registered to handle it", customCaveatName)
×
531
}
532

533
// CreateServerOpts creates the GRPC server options that can be added to a GRPC
534
// server in order to add this InterceptorChain.
UNCOV
535
func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption {
×
UNCOV
536
        var unaryInterceptors []grpc.UnaryServerInterceptor
×
UNCOV
537
        var strmInterceptors []grpc.StreamServerInterceptor
×
UNCOV
538

×
UNCOV
539
        // The first interceptors we'll add to the chain is our logging
×
UNCOV
540
        // interceptors, so we can automatically log all errors that happen
×
UNCOV
541
        // during RPC calls.
×
UNCOV
542
        unaryInterceptors = append(
×
UNCOV
543
                unaryInterceptors, errorLogUnaryServerInterceptor(r.rpcsLog),
×
UNCOV
544
        )
×
UNCOV
545
        strmInterceptors = append(
×
UNCOV
546
                strmInterceptors, errorLogStreamServerInterceptor(r.rpcsLog),
×
UNCOV
547
        )
×
UNCOV
548

×
UNCOV
549
        // Next we'll add our RPC state check interceptors, that will check
×
UNCOV
550
        // whether the attempted call is allowed in the current state.
×
UNCOV
551
        unaryInterceptors = append(
×
UNCOV
552
                unaryInterceptors, r.rpcStateUnaryServerInterceptor(),
×
UNCOV
553
        )
×
UNCOV
554
        strmInterceptors = append(
×
UNCOV
555
                strmInterceptors, r.rpcStateStreamServerInterceptor(),
×
UNCOV
556
        )
×
UNCOV
557

×
UNCOV
558
        // We'll add the macaroon interceptors. If macaroons aren't disabled,
×
UNCOV
559
        // then these interceptors will enforce macaroon authentication.
×
UNCOV
560
        unaryInterceptors = append(
×
UNCOV
561
                unaryInterceptors, r.MacaroonUnaryServerInterceptor(),
×
UNCOV
562
        )
×
UNCOV
563
        strmInterceptors = append(
×
UNCOV
564
                strmInterceptors, r.MacaroonStreamServerInterceptor(),
×
UNCOV
565
        )
×
UNCOV
566

×
UNCOV
567
        // Next, we'll add the interceptors for our custom macaroon caveat based
×
UNCOV
568
        // middleware.
×
UNCOV
569
        unaryInterceptors = append(
×
UNCOV
570
                unaryInterceptors, r.middlewareUnaryServerInterceptor(),
×
UNCOV
571
        )
×
UNCOV
572
        strmInterceptors = append(
×
UNCOV
573
                strmInterceptors, r.middlewareStreamServerInterceptor(),
×
UNCOV
574
        )
×
UNCOV
575

×
UNCOV
576
        // Get interceptors for Prometheus to gather gRPC performance metrics.
×
UNCOV
577
        // If monitoring is disabled, GetPromInterceptors() will return empty
×
UNCOV
578
        // slices.
×
UNCOV
579
        promUnaryInterceptors, promStrmInterceptors :=
×
UNCOV
580
                monitoring.GetPromInterceptors()
×
UNCOV
581

×
UNCOV
582
        // Concatenate the slices of unary and stream interceptors respectively.
×
UNCOV
583
        unaryInterceptors = append(unaryInterceptors, promUnaryInterceptors...)
×
UNCOV
584
        strmInterceptors = append(strmInterceptors, promStrmInterceptors...)
×
UNCOV
585

×
UNCOV
586
        // Create server options from the interceptors we just set up.
×
UNCOV
587
        chainedUnary := grpc_middleware.WithUnaryServerChain(
×
UNCOV
588
                unaryInterceptors...,
×
UNCOV
589
        )
×
UNCOV
590
        chainedStream := grpc_middleware.WithStreamServerChain(
×
UNCOV
591
                strmInterceptors...,
×
UNCOV
592
        )
×
UNCOV
593
        serverOpts := []grpc.ServerOption{chainedUnary, chainedStream}
×
UNCOV
594

×
UNCOV
595
        return serverOpts
×
UNCOV
596
}
×
597

598
// errorLogUnaryServerInterceptor is a simple UnaryServerInterceptor that will
599
// automatically log any errors that occur when serving a client's unary
600
// request.
UNCOV
601
func errorLogUnaryServerInterceptor(logger btclog.Logger) grpc.UnaryServerInterceptor {
×
UNCOV
602
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
×
UNCOV
603
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
604

×
UNCOV
605
                resp, err := handler(ctx, req)
×
UNCOV
606
                if err != nil {
×
UNCOV
607
                        // TODO(roasbeef): also log request details?
×
UNCOV
608
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
×
UNCOV
609
                }
×
610

UNCOV
611
                return resp, err
×
612
        }
613
}
614

615
// errorLogStreamServerInterceptor is a simple StreamServerInterceptor that
616
// will log any errors that occur while processing a client or server streaming
617
// RPC.
UNCOV
618
func errorLogStreamServerInterceptor(logger btclog.Logger) grpc.StreamServerInterceptor {
×
UNCOV
619
        return func(srv interface{}, ss grpc.ServerStream,
×
UNCOV
620
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
UNCOV
621

×
UNCOV
622
                err := handler(srv, ss)
×
UNCOV
623
                if err != nil {
×
UNCOV
624
                        logger.Errorf("[%v]: %v", info.FullMethod, err)
×
UNCOV
625
                }
×
626

UNCOV
627
                return err
×
628
        }
629
}
630

631
// checkMacaroon validates that the context contains the macaroon needed to
632
// invoke the given RPC method.
633
func (r *InterceptorChain) checkMacaroon(ctx context.Context,
UNCOV
634
        fullMethod string) error {
×
UNCOV
635

×
UNCOV
636
        // If noMacaroons is set, we'll always allow the call.
×
UNCOV
637
        if r.noMacaroons {
×
638
                return nil
×
639
        }
×
640

641
        // Check whether the method is whitelisted, if so we'll allow it
642
        // regardless of macaroons.
UNCOV
643
        _, ok := macaroonWhitelist[fullMethod]
×
UNCOV
644
        if ok {
×
UNCOV
645
                return nil
×
UNCOV
646
        }
×
647

UNCOV
648
        r.RLock()
×
UNCOV
649
        svc := r.svc
×
UNCOV
650
        r.RUnlock()
×
UNCOV
651

×
UNCOV
652
        // If the macaroon service is not yet active, we cannot allow
×
UNCOV
653
        // the call.
×
UNCOV
654
        if svc == nil {
×
655
                return fmt.Errorf("unable to determine macaroon permissions")
×
656
        }
×
657

UNCOV
658
        r.RLock()
×
UNCOV
659
        uriPermissions, ok := r.permissionMap[fullMethod]
×
UNCOV
660
        r.RUnlock()
×
UNCOV
661
        if !ok {
×
662
                return fmt.Errorf("%s: unknown permissions required for method",
×
663
                        fullMethod)
×
664
        }
×
665

666
        // Find out if there is an external validator registered for
667
        // this method. Fall back to the internal one if there isn't.
UNCOV
668
        validator, ok := svc.ExternalValidators[fullMethod]
×
UNCOV
669
        if !ok {
×
UNCOV
670
                validator = svc
×
UNCOV
671
        }
×
672

673
        // Now that we know what validator to use, let it do its work.
UNCOV
674
        return validator.ValidateMacaroon(ctx, uriPermissions, fullMethod)
×
675
}
676

677
// MacaroonUnaryServerInterceptor is a GRPC interceptor that checks whether the
678
// request is authorized by the included macaroons.
UNCOV
679
func (r *InterceptorChain) MacaroonUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
UNCOV
680
        return func(ctx context.Context, req interface{},
×
UNCOV
681
                info *grpc.UnaryServerInfo,
×
UNCOV
682
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
683

×
UNCOV
684
                // Check macaroons.
×
UNCOV
685
                if err := r.checkMacaroon(ctx, info.FullMethod); err != nil {
×
UNCOV
686
                        return nil, err
×
UNCOV
687
                }
×
688

UNCOV
689
                return handler(ctx, req)
×
690
        }
691
}
692

693
// MacaroonStreamServerInterceptor is a GRPC interceptor that checks whether
694
// the request is authorized by the included macaroons.
UNCOV
695
func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerInterceptor {
×
UNCOV
696
        return func(srv interface{}, ss grpc.ServerStream,
×
UNCOV
697
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
UNCOV
698

×
UNCOV
699
                // Check macaroons.
×
UNCOV
700
                err := r.checkMacaroon(ss.Context(), info.FullMethod)
×
UNCOV
701
                if err != nil {
×
702
                        return err
×
703
                }
×
704

UNCOV
705
                return handler(srv, ss)
×
706
        }
707
}
708

709
// checkRPCState checks whether a call to the given server is allowed in the
710
// current RPC state.
UNCOV
711
func (r *InterceptorChain) checkRPCState(srv interface{}) error {
×
UNCOV
712
        // The StateService is being accessed, we allow the call regardless of
×
UNCOV
713
        // the current state.
×
UNCOV
714
        _, ok := srv.(lnrpc.StateServer)
×
UNCOV
715
        if ok {
×
UNCOV
716
                return nil
×
UNCOV
717
        }
×
718

UNCOV
719
        r.RLock()
×
UNCOV
720
        state := r.state
×
UNCOV
721
        r.RUnlock()
×
UNCOV
722

×
UNCOV
723
        switch state {
×
724
        // Do not accept any RPC calls (unless to the state service) until LND
725
        // has not started.
726
        case waitingToStart:
×
727
                return ErrWaitingToStart
×
728

729
        // If the wallet does not exists, only calls to the WalletUnlocker are
730
        // accepted.
UNCOV
731
        case walletNotCreated:
×
UNCOV
732
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
UNCOV
733
                if !ok {
×
734
                        return ErrNoWallet
×
735
                }
×
736

737
        // If the wallet is locked, only calls to the WalletUnlocker are
738
        // accepted.
UNCOV
739
        case walletLocked:
×
UNCOV
740
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
UNCOV
741
                if !ok {
×
742
                        return ErrWalletLocked
×
743
                }
×
744

745
        // If the wallet is unlocked, but the RPC not yet active, we reject.
746
        case walletUnlocked:
×
747
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
748
                if ok {
×
749
                        return ErrWalletUnlocked
×
750
                }
×
751

752
                return ErrRPCStarting
×
753

754
        // If the RPC server or lnd server is active, we allow calls to any
755
        // service except the WalletUnlocker.
UNCOV
756
        case rpcActive, serverActive:
×
UNCOV
757
                _, ok := srv.(lnrpc.WalletUnlockerServer)
×
UNCOV
758
                if ok {
×
759
                        return ErrWalletUnlocked
×
760
                }
×
761

762
        default:
×
763
                return fmt.Errorf("unknown RPC state: %v", state)
×
764
        }
765

UNCOV
766
        return nil
×
767
}
768

769
// rpcStateUnaryServerInterceptor is a GRPC interceptor that checks whether
770
// calls to the given gGRPC server is allowed in the current rpc state.
UNCOV
771
func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
UNCOV
772
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
×
UNCOV
773
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
774

×
UNCOV
775
                r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
×
UNCOV
776

×
UNCOV
777
                if err := r.checkRPCState(info.Server); err != nil {
×
778
                        return nil, err
×
779
                }
×
780

UNCOV
781
                return handler(ctx, req)
×
782
        }
783
}
784

785
// rpcStateStreamServerInterceptor is a GRPC interceptor that checks whether
786
// calls to the given gGRPC server is allowed in the current rpc state.
UNCOV
787
func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerInterceptor {
×
UNCOV
788
        return func(srv interface{}, ss grpc.ServerStream,
×
UNCOV
789
                info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
UNCOV
790

×
UNCOV
791
                r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
×
UNCOV
792

×
UNCOV
793
                if err := r.checkRPCState(srv); err != nil {
×
794
                        return err
×
795
                }
×
796

UNCOV
797
                return handler(srv, ss)
×
798
        }
799
}
800

801
// middlewareUnaryServerInterceptor is a unary gRPC interceptor that intercepts
802
// all requests and responses that are sent with a macaroon containing a custom
803
// caveat condition that is handled by registered middleware.
UNCOV
804
func (r *InterceptorChain) middlewareUnaryServerInterceptor() grpc.UnaryServerInterceptor {
×
UNCOV
805
        return func(ctx context.Context,
×
UNCOV
806
                req interface{}, info *grpc.UnaryServerInfo,
×
UNCOV
807
                handler grpc.UnaryHandler) (interface{}, error) {
×
UNCOV
808

×
UNCOV
809
                // Make sure we don't allow any requests through if one of the
×
UNCOV
810
                // mandatory middlewares is missing.
×
UNCOV
811
                fullMethod := info.FullMethod
×
UNCOV
812
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
×
813
                        return nil, err
×
814
                }
×
815

816
                // If there is no middleware registered, we don't need to
817
                // intercept anything.
UNCOV
818
                if !r.middlewareRegistered() {
×
UNCOV
819
                        return handler(ctx, req)
×
UNCOV
820
                }
×
821

UNCOV
822
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
×
UNCOV
823
                req, err := r.interceptMessage(
×
UNCOV
824
                        ctx, TypeRequest, requestID, false, info.FullMethod,
×
UNCOV
825
                        req,
×
UNCOV
826
                )
×
UNCOV
827
                if err != nil {
×
828
                        return nil, err
×
829
                }
×
830

831
                // Call the handler, which executes the request against lnd.
UNCOV
832
                lndResp, lndErr := handler(ctx, req)
×
UNCOV
833
                if lndErr != nil {
×
UNCOV
834
                        // The call to lnd ended in an error and not a normal
×
UNCOV
835
                        // proto message response. Send the error to the
×
UNCOV
836
                        // interceptor as well to inform about the abnormal
×
UNCOV
837
                        // termination of the stream and to give the option to
×
UNCOV
838
                        // replace the error message with a custom one.
×
UNCOV
839
                        replacedErr, err := r.interceptMessage(
×
UNCOV
840
                                ctx, TypeResponse, requestID, false,
×
UNCOV
841
                                info.FullMethod, lndErr,
×
UNCOV
842
                        )
×
UNCOV
843
                        if err != nil {
×
844
                                return nil, err
×
845
                        }
×
UNCOV
846
                        return lndResp, replacedErr.(error)
×
847
                }
848

UNCOV
849
                return r.interceptMessage(
×
UNCOV
850
                        ctx, TypeResponse, requestID, false, info.FullMethod,
×
UNCOV
851
                        lndResp,
×
UNCOV
852
                )
×
853
        }
854
}
855

856
// middlewareStreamServerInterceptor is a streaming gRPC interceptor that
857
// intercepts all requests and responses that are sent with a macaroon
858
// containing a custom caveat condition that is handled by registered
859
// middleware.
UNCOV
860
func (r *InterceptorChain) middlewareStreamServerInterceptor() grpc.StreamServerInterceptor {
×
UNCOV
861
        return func(srv interface{},
×
UNCOV
862
                ss grpc.ServerStream, info *grpc.StreamServerInfo,
×
UNCOV
863
                handler grpc.StreamHandler) error {
×
UNCOV
864

×
UNCOV
865
                // Don't intercept the interceptor itself which is a streaming
×
UNCOV
866
                // RPC too!
×
UNCOV
867
                fullMethod := info.FullMethod
×
UNCOV
868
                if fullMethod == lnrpc.RegisterRPCMiddlewareURI {
×
UNCOV
869
                        return handler(srv, ss)
×
UNCOV
870
                }
×
871

872
                // Make sure we don't allow any requests through if one of the
873
                // mandatory middlewares is missing. We add this check here to
874
                // make sure the middleware registration itself can still be
875
                // called.
UNCOV
876
                if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
×
877
                        return err
×
878
                }
×
879

880
                // If there is no middleware registered, we don't need to
881
                // intercept anything.
UNCOV
882
                if !r.middlewareRegistered() {
×
UNCOV
883
                        return handler(srv, ss)
×
UNCOV
884
                }
×
885

886
                // To give the middleware a chance to accept or reject the
887
                // establishment of the stream itself (and not only when the
888
                // first message is sent on the stream), we send an intercept
889
                // request for the stream auth now:
UNCOV
890
                msg, err := NewStreamAuthInterceptionRequest(
×
UNCOV
891
                        ss.Context(), info.FullMethod,
×
UNCOV
892
                )
×
UNCOV
893
                if err != nil {
×
894
                        return err
×
895
                }
×
896

UNCOV
897
                requestID := atomic.AddUint64(&r.lastRequestID, 1)
×
UNCOV
898
                err = r.acceptStream(requestID, msg)
×
UNCOV
899
                if err != nil {
×
900
                        return err
×
901
                }
×
902

UNCOV
903
                wrappedSS := &serverStreamWrapper{
×
UNCOV
904
                        ServerStream: ss,
×
UNCOV
905
                        requestID:    requestID,
×
UNCOV
906
                        fullMethod:   info.FullMethod,
×
UNCOV
907
                        interceptor:  r,
×
UNCOV
908
                }
×
UNCOV
909

×
UNCOV
910
                // Call the stream handler, which will block as long as the
×
UNCOV
911
                // stream is alive.
×
UNCOV
912
                lndErr := handler(srv, wrappedSS)
×
UNCOV
913
                if lndErr != nil {
×
UNCOV
914
                        // This is an error being returned from lnd. Send it to
×
UNCOV
915
                        // the interceptor as well to inform about the abnormal
×
UNCOV
916
                        // termination of the stream and to give the option to
×
UNCOV
917
                        // replace the error message with a custom one.
×
UNCOV
918
                        replacedErr, err := r.interceptMessage(
×
UNCOV
919
                                ss.Context(), TypeResponse, requestID,
×
UNCOV
920
                                true, info.FullMethod, lndErr,
×
UNCOV
921
                        )
×
UNCOV
922
                        if err != nil {
×
923
                                return err
×
924
                        }
×
925

UNCOV
926
                        return replacedErr.(error)
×
927
                }
928

929
                // Normal/successful termination of the stream.
UNCOV
930
                return nil
×
931
        }
932
}
933

934
// checkMandatoryMiddleware makes sure that each of the middlewares declared as
935
// mandatory is currently registered.
UNCOV
936
func (r *InterceptorChain) checkMandatoryMiddleware(fullMethod string) error {
×
UNCOV
937
        r.RLock()
×
UNCOV
938
        defer r.RUnlock()
×
UNCOV
939

×
UNCOV
940
        // Allow calls that are whitelisted for macaroons as well, otherwise we
×
UNCOV
941
        // get into impossible situations where the wallet is locked but the
×
UNCOV
942
        // unlock call is denied because the middleware isn't registered. But
×
UNCOV
943
        // the middleware cannot register itself because the wallet is locked.
×
UNCOV
944
        if _, ok := macaroonWhitelist[fullMethod]; ok {
×
UNCOV
945
                return nil
×
UNCOV
946
        }
×
947

948
        // Not a white listed call so make sure every mandatory middleware is
949
        // currently connected to lnd.
UNCOV
950
        for _, name := range r.mandatoryMiddleware {
×
951
                if _, ok := r.registeredMiddlewareNames[name]; !ok {
×
952
                        return fmt.Errorf("mandatory middleware '%s' is "+
×
953
                                "currently not registered, not allowing any "+
×
954
                                "RPC calls", name)
×
955
                }
×
956
        }
957

UNCOV
958
        return nil
×
959
}
960

961
// middlewareRegistered returns true if there is at least one middleware
962
// currently registered.
UNCOV
963
func (r *InterceptorChain) middlewareRegistered() bool {
×
UNCOV
964
        r.RLock()
×
UNCOV
965
        defer r.RUnlock()
×
UNCOV
966

×
UNCOV
967
        return len(r.registeredMiddleware) > 0
×
UNCOV
968
}
×
969

970
// acceptStream sends an intercept request to all middlewares that have
971
// registered for it. This means either a middleware has requested read-only
972
// access or the request actually has a macaroon with a caveat the middleware
973
// registered for.
974
func (r *InterceptorChain) acceptStream(requestID uint64,
UNCOV
975
        msg *InterceptionRequest) error {
×
UNCOV
976

×
UNCOV
977
        r.RLock()
×
UNCOV
978
        defer r.RUnlock()
×
UNCOV
979

×
UNCOV
980
        for _, middleware := range r.registeredMiddleware {
×
UNCOV
981
                // If there is a custom caveat in the macaroon, make sure the
×
UNCOV
982
                // middleware registered for it. Or if a middleware registered
×
UNCOV
983
                // for read-only mode, it also gets the request.
×
UNCOV
984
                hasCustomCaveat := macaroons.HasCustomCaveat(
×
UNCOV
985
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
986
                )
×
UNCOV
987
                if !hasCustomCaveat && !middleware.readOnly {
×
988
                        continue
×
989
                }
990

UNCOV
991
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
×
UNCOV
992
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
993
                )
×
UNCOV
994

×
UNCOV
995
                resp, err := middleware.intercept(requestID, msg)
×
UNCOV
996

×
UNCOV
997
                // Error during interception itself.
×
UNCOV
998
                if err != nil {
×
999
                        return err
×
1000
                }
×
1001

1002
                // Error returned from middleware client.
UNCOV
1003
                if resp.err != nil {
×
1004
                        return resp.err
×
1005
                }
×
1006
        }
1007

UNCOV
1008
        return nil
×
1009
}
1010

1011
// interceptMessage sends out an intercept request for an RPC response. Since
1012
// middleware that hasn't registered for the read-only mode has the option to
1013
// overwrite/replace the message, this needs to be handled differently than the
1014
// auth path above.
1015
func (r *InterceptorChain) interceptMessage(ctx context.Context,
1016
        interceptType InterceptType, requestID uint64, isStream bool,
UNCOV
1017
        fullMethod string, m interface{}) (interface{}, error) {
×
UNCOV
1018

×
UNCOV
1019
        r.RLock()
×
UNCOV
1020
        defer r.RUnlock()
×
UNCOV
1021

×
UNCOV
1022
        currentMessage := m
×
UNCOV
1023
        for _, middleware := range r.registeredMiddleware {
×
UNCOV
1024
                msg, err := NewMessageInterceptionRequest(
×
UNCOV
1025
                        ctx, interceptType, isStream, fullMethod,
×
UNCOV
1026
                        currentMessage,
×
UNCOV
1027
                )
×
UNCOV
1028
                if err != nil {
×
1029
                        return nil, err
×
1030
                }
×
1031

1032
                // If there is a custom caveat in the macaroon, make sure the
1033
                // middleware registered for it. Or if a middleware registered
1034
                // for read-only mode, it also gets the request.
UNCOV
1035
                hasCustomCaveat := macaroons.HasCustomCaveat(
×
UNCOV
1036
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
1037
                )
×
UNCOV
1038
                if !hasCustomCaveat && !middleware.readOnly {
×
UNCOV
1039
                        continue
×
1040
                }
1041

UNCOV
1042
                msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
×
UNCOV
1043
                        msg.Macaroon, middleware.customCaveatName,
×
UNCOV
1044
                )
×
UNCOV
1045

×
UNCOV
1046
                resp, err := middleware.intercept(requestID, msg)
×
UNCOV
1047

×
UNCOV
1048
                // Error during interception itself.
×
UNCOV
1049
                if err != nil {
×
1050
                        return nil, err
×
1051
                }
×
1052

1053
                // Error returned from middleware client.
UNCOV
1054
                if resp.err != nil {
×
1055
                        return nil, resp.err
×
1056
                }
×
1057

1058
                // The message was replaced, make sure the next middleware in
1059
                // line receives the updated message.
UNCOV
1060
                if !middleware.readOnly && resp.replace {
×
UNCOV
1061
                        currentMessage = resp.replacement
×
UNCOV
1062
                }
×
1063
        }
1064

UNCOV
1065
        return currentMessage, nil
×
1066
}
1067

1068
// serverStreamWrapper is a struct that wraps a server stream in a way that all
1069
// requests and responses can be intercepted individually.
1070
type serverStreamWrapper struct {
1071
        // ServerStream is the stream that's being wrapped.
1072
        grpc.ServerStream
1073

1074
        requestID uint64
1075

1076
        fullMethod string
1077

1078
        interceptor *InterceptorChain
1079
}
1080

1081
// SendMsg is called when lnd sends a message to the client. This is wrapped to
1082
// intercept streaming RPC responses.
UNCOV
1083
func (w *serverStreamWrapper) SendMsg(m interface{}) error {
×
UNCOV
1084
        newMsg, err := w.interceptor.interceptMessage(
×
UNCOV
1085
                w.ServerStream.Context(), TypeResponse, w.requestID, true,
×
UNCOV
1086
                w.fullMethod, m,
×
UNCOV
1087
        )
×
UNCOV
1088
        if err != nil {
×
1089
                return err
×
1090
        }
×
1091

UNCOV
1092
        return w.ServerStream.SendMsg(newMsg)
×
1093
}
1094

1095
// RecvMsg is called when lnd wants to receive a message from the client. This
1096
// is wrapped to intercept streaming RPC requests.
UNCOV
1097
func (w *serverStreamWrapper) RecvMsg(m interface{}) error {
×
UNCOV
1098
        err := w.ServerStream.RecvMsg(m)
×
UNCOV
1099
        if err != nil {
×
1100
                return err
×
1101
        }
×
1102

UNCOV
1103
        req, err := w.interceptor.interceptMessage(
×
UNCOV
1104
                w.ServerStream.Context(), TypeRequest, w.requestID, true,
×
UNCOV
1105
                w.fullMethod, m,
×
UNCOV
1106
        )
×
UNCOV
1107
        if err != nil {
×
1108
                return err
×
1109
        }
×
1110

UNCOV
1111
        return replaceProtoMsg(m, req)
×
1112
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc