• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11954082915

21 Nov 2024 01:20PM UTC coverage: 59.327% (+0.6%) from 58.776%
11954082915

Pull #8754

github

ViktorTigerstrom
itest: wrap deriveCustomScopeAccounts at 80 chars

This commit fixes that word wrapping for the deriveCustomScopeAccounts
function docs, and ensures that it wraps at 80 characters or less.
Pull Request #8754: Add `Outbound` Remote Signer implementation

1940 of 2984 new or added lines in 44 files covered. (65.01%)

226 existing lines in 37 files now uncovered.

135234 of 227947 relevant lines covered (59.33%)

19316.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.01
/lnrpc/chainrpc/chain_server.go
1
//go:build chainrpc
2
// +build chainrpc
3

4
package chainrpc
5

6
import (
7
        "bytes"
8
        "context"
9
        "errors"
10
        "os"
11
        "path/filepath"
12
        "sync"
13
        "sync/atomic"
14

15
        "github.com/btcsuite/btcd/chaincfg/chainhash"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
18
        "github.com/lightningnetwork/lnd/chainntnfs"
19
        "github.com/lightningnetwork/lnd/lnrpc"
20
        "github.com/lightningnetwork/lnd/macaroons"
21
        "google.golang.org/grpc"
22
        "gopkg.in/macaroon-bakery.v2/bakery"
23
)
24

25
const (
26
        // subServerName is the name of the RPC sub-server. We'll use this name
27
        // to register ourselves, and we also require that the main
28
        // SubServerConfigDispatcher instance recognize this as the name of the
29
        // config file that we need.
30
        subServerName = "ChainRPC"
31
)
32

33
var (
34
        // macaroonOps are the set of capabilities that our minted macaroon (if
35
        // it doesn't already exist) will have.
36
        macaroonOps = []bakery.Op{
37
                {
38
                        Entity: "onchain",
39
                        Action: "read",
40
                },
41
        }
42

43
        // macPermissions maps RPC calls to the permissions they require.
44
        macPermissions = map[string][]bakery.Op{
45
                "/chainrpc.ChainKit/GetBlock": {{
46
                        Entity: "onchain",
47
                        Action: "read",
48
                }},
49
                "/chainrpc.ChainKit/GetBlockHeader": {{
50
                        Entity: "onchain",
51
                        Action: "read",
52
                }},
53
                "/chainrpc.ChainKit/GetBestBlock": {{
54
                        Entity: "onchain",
55
                        Action: "read",
56
                }},
57
                "/chainrpc.ChainKit/GetBlockHash": {{
58
                        Entity: "onchain",
59
                        Action: "read",
60
                }},
61
                "/chainrpc.ChainNotifier/RegisterConfirmationsNtfn": {{
62
                        Entity: "onchain",
63
                        Action: "read",
64
                }},
65
                "/chainrpc.ChainNotifier/RegisterSpendNtfn": {{
66
                        Entity: "onchain",
67
                        Action: "read",
68
                }},
69
                "/chainrpc.ChainNotifier/RegisterBlockEpochNtfn": {{
70
                        Entity: "onchain",
71
                        Action: "read",
72
                }},
73
        }
74

75
        // DefaultChainNotifierMacFilename is the default name of the chain
76
        // notifier macaroon that we expect to find via a file handle within the
77
        // main configuration file in this package.
78
        DefaultChainNotifierMacFilename = "chainnotifier.macaroon"
79

80
        // ErrChainNotifierServerShuttingDown is an error returned when we are
81
        // waiting for a notification to arrive but the chain notifier server
82
        // has been shut down.
83
        ErrChainNotifierServerShuttingDown = errors.New("chain notifier RPC " +
84
                "subserver shutting down")
85

86
        // ErrChainNotifierServerNotActive indicates that the chain notifier hasn't
87
        // finished the startup process.
88
        ErrChainNotifierServerNotActive = errors.New("chain notifier RPC is " +
89
                "still in the process of starting")
90
)
91

92
// ServerShell is a shell struct holding a reference to the actual sub-server.
93
// It is used to register the gRPC sub-server with the root server before we
94
// have the necessary dependencies to populate the actual sub-server.
95
type ServerShell struct {
96
        ChainKitServer
97
        ChainNotifierServer
98
}
99

100
// Server is a sub-server of the main RPC server. It serves the chainkit RPC
101
// and chain notifier RPC. This RPC sub-server allows external callers to access
102
// the full chainkit and chain notifier capabilities of lnd. This allows callers
103
// to create custom protocols, external to lnd, even backed by multiple distinct
104
// lnd across independent failure domains.
105
type Server struct {
106
        injected int32 // To be used atomically.
107

108
        // Required by the grpc-gateway/v2 library for forward compatibility.
109
        UnimplementedChainNotifierServer
110
        UnimplementedChainKitServer
111

112
        stopped sync.Once
113

114
        cfg Config
115

116
        quit chan struct{}
117
}
118

119
// New returns a new instance of the chainrpc ChainNotifier sub-server. We also
120
// return the set of permissions for the macaroons that we may create within
121
// this method. If the macaroons we need aren't found in the filepath, then
122
// we'll create them on start up. If we're unable to locate, or create the
123
// macaroons we need, then we'll return with an error.
124
func New() (*Server, lnrpc.MacaroonPerms, error) {
4✔
125
        return &Server{
4✔
126
                cfg:  Config{},
4✔
127
                quit: make(chan struct{}),
4✔
128
        }, macPermissions, nil
4✔
129
}
4✔
130

131
// Compile-time checks to ensure that Server fully implements the
132
// ChainNotifierServer gRPC service, ChainKitServer gRPC service, and
133
// lnrpc.SubServer interface.
134
var _ ChainNotifierServer = (*Server)(nil)
135
var _ ChainKitServer = (*Server)(nil)
136
var _ lnrpc.SubServer = (*Server)(nil)
137

138
// Stop signals any active goroutines for a graceful closure.
139
//
140
// NOTE: This is part of the lnrpc.SubServer interface.
141
func (s *Server) Stop() error {
4✔
142
        s.stopped.Do(func() {
8✔
143
                close(s.quit)
4✔
144
        })
4✔
145

146
        return nil
4✔
147
}
148

149
// InjectDependencies populates the sub-server's dependencies. If the
150
// finalizeDependencies boolean is true, then the sub-server will finalize its
151
// dependencies and return an error if any required dependencies are missing.
152
//
153
// NOTE: This is part of the lnrpc.SubServer interface.
154
func (s *Server) InjectDependencies(
155
        configRegistry lnrpc.SubServerConfigDispatcher,
156
        finalizeDependencies bool) error {
4✔
157

4✔
158
        if finalizeDependencies && atomic.AddInt32(&s.injected, 1) != 1 {
4✔
NEW
159
                return lnrpc.ErrDependenciesFinalized
×
NEW
160
        }
×
161

162
        cfg, err := getConfig(configRegistry, finalizeDependencies)
4✔
163
        if err != nil {
4✔
NEW
164
                return err
×
NEW
165
        }
×
166

167
        if finalizeDependencies {
8✔
168
                s.cfg = *cfg
4✔
169

4✔
170
                return nil
4✔
171
        }
4✔
172

173
        // If the path of the chain notifier macaroon wasn't generated, then
174
        // we'll assume that it's found at the default network directory.
UNCOV
175
        if cfg.ChainNotifierMacPath == "" {
×
UNCOV
176
                cfg.ChainNotifierMacPath = filepath.Join(
×
UNCOV
177
                        cfg.NetworkDir, DefaultChainNotifierMacFilename,
×
UNCOV
178
                )
×
UNCOV
179
        }
×
180

181
        // Now that we know the full path of the chain notifier macaroon, we can
182
        // check to see if we need to create it or not. If stateless_init is set
183
        // then we don't write the macaroons.
UNCOV
184
        macFilePath := cfg.ChainNotifierMacPath
×
UNCOV
185
        if cfg.MacService != nil && !cfg.MacService.StatelessInit &&
×
UNCOV
186
                !lnrpc.FileExists(macFilePath) {
×
UNCOV
187

×
NEW
188
                log.Infof("Baking macaroons for ChainNotifier RPC Server "+
×
NEW
189
                        "at: %v", macFilePath)
×
UNCOV
190

×
UNCOV
191
                // At this point, we know that the chain notifier macaroon
×
UNCOV
192
                // doesn't yet, exist, so we need to create it with the help of
×
UNCOV
193
                // the main macaroon service.
×
UNCOV
194
                chainNotifierMac, err := cfg.MacService.NewMacaroon(
×
UNCOV
195
                        context.Background(), macaroons.DefaultRootKeyID,
×
UNCOV
196
                        macaroonOps...,
×
UNCOV
197
                )
×
UNCOV
198
                if err != nil {
×
NEW
199
                        return err
×
200
                }
×
UNCOV
201
                chainNotifierMacBytes, err := chainNotifierMac.M().MarshalBinary()
×
UNCOV
202
                if err != nil {
×
NEW
203
                        return err
×
204
                }
×
UNCOV
205
                err = os.WriteFile(macFilePath, chainNotifierMacBytes, 0644)
×
UNCOV
206
                if err != nil {
×
207
                        _ = os.Remove(macFilePath)
×
NEW
208
                        return err
×
209
                }
×
210
        }
211

NEW
212
        s.cfg = *cfg
×
UNCOV
213

×
UNCOV
214
        return nil
×
215
}
216

217
// Name returns a unique string representation of the sub-server. This can be
218
// used to identify the sub-server and also de-duplicate them.
219
//
220
// NOTE: This is part of the lnrpc.SubServer interface.
221
func (s *Server) Name() string {
4✔
222
        return subServerName
4✔
223
}
4✔
224

225
// RegisterWithRootServer will be called by the root gRPC server to direct a RPC
226
// sub-server to register itself with the main gRPC root server. Until this is
227
// called, each sub-server won't be able to have requests routed towards it.
228
//
229
// NOTE: This is part of the lnrpc.GrpcHandler interface.
230
func (r *ServerShell) RegisterWithRootServer(grpcServer *grpc.Server) error {
4✔
231
        // We make sure that we register it with the main gRPC server to ensure
4✔
232
        // all our methods are routed properly.
4✔
233
        RegisterChainNotifierServer(grpcServer, r)
4✔
234
        log.Debug("ChainNotifier RPC server successfully register with root " +
4✔
235
                "gRPC server")
4✔
236

4✔
237
        RegisterChainKitServer(grpcServer, r)
4✔
238
        log.Debug("ChainKit RPC server successfully register with root gRPC " +
4✔
239
                "server")
4✔
240

4✔
241
        return nil
4✔
242
}
4✔
243

244
// RegisterWithRestServer will be called by the root REST mux to direct a sub
245
// RPC server to register itself with the main REST mux server. Until this is
246
// called, each sub-server won't be able to have requests routed towards it.
247
//
248
// NOTE: This is part of the lnrpc.GrpcHandler interface.
249
func (r *ServerShell) RegisterWithRestServer(ctx context.Context,
250
        mux *runtime.ServeMux, dest string, opts []grpc.DialOption) error {
4✔
251

4✔
252
        // We make sure that we register it with the main REST server to ensure
4✔
253
        // all our methods are routed properly.
4✔
254
        err := RegisterChainNotifierHandlerFromEndpoint(ctx, mux, dest, opts)
4✔
255
        if err != nil {
4✔
256
                log.Errorf("Could not register ChainNotifier REST server "+
×
257
                        "with root REST server: %v", err)
×
258
                return err
×
259
        }
×
260

261
        log.Debugf("ChainNotifier REST server successfully registered with " +
4✔
262
                "root REST server")
4✔
263

4✔
264
        // Register chainkit with the main REST server to ensure all our methods
4✔
265
        // are routed properly.
4✔
266
        err = RegisterChainKitHandlerFromEndpoint(ctx, mux, dest, opts)
4✔
267
        if err != nil {
4✔
268
                log.Errorf("Could not register ChainKit REST server with root "+
×
269
                        "REST server: %v", err)
×
270
                return err
×
271
        }
×
272
        log.Debugf("ChainKit REST server successfully registered with root " +
4✔
273
                "REST server")
4✔
274

4✔
275
        return nil
4✔
276
}
277

278
// CreateSubServer creates an instance of the sub-server, and returns the
279
// macaroon permissions that the sub-server wishes to pass on to the root server
280
// for all methods routed towards it.
281
//
282
// NOTE: This is part of the lnrpc.GrpcHandler interface.
283
func (r *ServerShell) CreateSubServer() (
284
        lnrpc.SubServer, lnrpc.MacaroonPerms, error) {
4✔
285

4✔
286
        subServer, macPermissions, err := New()
4✔
287
        if err != nil {
4✔
288
                return nil, nil, err
×
289
        }
×
290

291
        r.ChainNotifierServer = subServer
4✔
292
        r.ChainKitServer = subServer
4✔
293
        return subServer, macPermissions, nil
4✔
294
}
295

296
// GetBlock returns a block given the corresponding block hash.
297
func (s *Server) GetBlock(_ context.Context,
298
        in *GetBlockRequest) (*GetBlockResponse, error) {
4✔
299

4✔
300
        // We'll start by reconstructing the RPC request into what the
4✔
301
        // underlying chain functionality expects.
4✔
302
        var blockHash chainhash.Hash
4✔
303
        copy(blockHash[:], in.BlockHash)
4✔
304

4✔
305
        block, err := s.cfg.Chain.GetBlock(&blockHash)
4✔
306
        if err != nil {
4✔
307
                return nil, err
×
308
        }
×
309

310
        // Serialize block for RPC response.
311
        var blockBuf bytes.Buffer
4✔
312
        err = block.Serialize(&blockBuf)
4✔
313
        if err != nil {
4✔
314
                return nil, err
×
315
        }
×
316
        rawBlock := blockBuf.Bytes()
4✔
317

4✔
318
        return &GetBlockResponse{RawBlock: rawBlock}, nil
4✔
319
}
320

321
// GetBlockHeader returns a block header given the corresponding block hash.
322
func (s *Server) GetBlockHeader(_ context.Context,
323
        in *GetBlockHeaderRequest) (*GetBlockHeaderResponse, error) {
4✔
324

4✔
325
        // We'll start by reconstructing the RPC request into what the
4✔
326
        // underlying chain functionality expects.
4✔
327
        var blockHash chainhash.Hash
4✔
328
        copy(blockHash[:], in.BlockHash)
4✔
329

4✔
330
        blockHeader, err := s.cfg.Chain.GetBlockHeader(&blockHash)
4✔
331
        if err != nil {
4✔
332
                return nil, err
×
333
        }
×
334

335
        // Serialize block header for RPC response.
336
        var headerBuf bytes.Buffer
4✔
337
        err = blockHeader.Serialize(&headerBuf)
4✔
338
        if err != nil {
4✔
339
                return nil, err
×
340
        }
×
341
        rawHeader := headerBuf.Bytes()
4✔
342

4✔
343
        return &GetBlockHeaderResponse{RawBlockHeader: rawHeader}, nil
4✔
344
}
345

346
// GetBestBlock returns the latest block hash and current height of the valid
347
// most-work chain.
348
func (s *Server) GetBestBlock(_ context.Context,
349
        _ *GetBestBlockRequest) (*GetBestBlockResponse, error) {
4✔
350

4✔
351
        blockHash, blockHeight, err := s.cfg.Chain.GetBestBlock()
4✔
352
        if err != nil {
4✔
353
                return nil, err
×
354
        }
×
355

356
        return &GetBestBlockResponse{
4✔
357
                BlockHash:   blockHash[:],
4✔
358
                BlockHeight: blockHeight,
4✔
359
        }, nil
4✔
360
}
361

362
// GetBlockHash returns the hash of the block in the best blockchain
363
// at the given height.
364
func (s *Server) GetBlockHash(_ context.Context,
365
        req *GetBlockHashRequest) (*GetBlockHashResponse, error) {
4✔
366

4✔
367
        blockHash, err := s.cfg.Chain.GetBlockHash(req.BlockHeight)
4✔
368
        if err != nil {
4✔
369
                return nil, err
×
370
        }
×
371

372
        return &GetBlockHashResponse{
4✔
373
                BlockHash: blockHash[:],
4✔
374
        }, nil
4✔
375
}
376

377
// RegisterConfirmationsNtfn is a synchronous response-streaming RPC that
378
// registers an intent for a client to be notified once a confirmation request
379
// has reached its required number of confirmations on-chain.
380
//
381
// A client can specify whether the confirmation request should be for a
382
// particular transaction by its hash or for an output script by specifying a
383
// zero hash.
384
//
385
// NOTE: This is part of the chainrpc.ChainNotifierService interface.
386
func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest,
387
        confStream ChainNotifier_RegisterConfirmationsNtfnServer) error {
4✔
388

4✔
389
        if !s.cfg.ChainNotifier.Started() {
4✔
390
                return ErrChainNotifierServerNotActive
×
391
        }
×
392

393
        // We'll start by reconstructing the RPC request into what the
394
        // underlying ChainNotifier expects.
395
        var txid chainhash.Hash
4✔
396
        copy(txid[:], in.Txid)
4✔
397

4✔
398
        var opts []chainntnfs.NotifierOption
4✔
399
        if in.IncludeBlock {
8✔
400
                opts = append(opts, chainntnfs.WithIncludeBlock())
4✔
401
        }
4✔
402

403
        // We'll then register for the spend notification of the request.
404
        confEvent, err := s.cfg.ChainNotifier.RegisterConfirmationsNtfn(
4✔
405
                &txid, in.Script, in.NumConfs, in.HeightHint, opts...,
4✔
406
        )
4✔
407
        if err != nil {
4✔
408
                return err
×
409
        }
×
410
        defer confEvent.Cancel()
4✔
411

4✔
412
        // With the request registered, we'll wait for its spend notification to
4✔
413
        // be dispatched.
4✔
414
        for {
8✔
415
                select {
4✔
416
                // The transaction satisfying the request has confirmed on-chain
417
                // and reached its required number of confirmations. We'll
418
                // dispatch an event to the caller indicating so.
419
                case details, ok := <-confEvent.Confirmed:
4✔
420
                        if !ok {
4✔
421
                                return chainntnfs.ErrChainNotifierShuttingDown
×
422
                        }
×
423

424
                        var rawTxBuf bytes.Buffer
4✔
425
                        err := details.Tx.Serialize(&rawTxBuf)
4✔
426
                        if err != nil {
4✔
427
                                return err
×
428
                        }
×
429

430
                        // If the block was included (should only be there if
431
                        // IncludeBlock is true), then we'll encode the bytes
432
                        // to send with the response.
433
                        var blockBytes []byte
4✔
434
                        if details.Block != nil {
8✔
435
                                var blockBuf bytes.Buffer
4✔
436
                                err := details.Block.Serialize(&blockBuf)
4✔
437
                                if err != nil {
4✔
438
                                        return err
×
439
                                }
×
440

441
                                blockBytes = blockBuf.Bytes()
4✔
442
                        }
443

444
                        rpcConfDetails := &ConfDetails{
4✔
445
                                RawTx:       rawTxBuf.Bytes(),
4✔
446
                                BlockHash:   details.BlockHash[:],
4✔
447
                                BlockHeight: details.BlockHeight,
4✔
448
                                TxIndex:     details.TxIndex,
4✔
449
                                RawBlock:    blockBytes,
4✔
450
                        }
4✔
451

4✔
452
                        conf := &ConfEvent{
4✔
453
                                Event: &ConfEvent_Conf{
4✔
454
                                        Conf: rpcConfDetails,
4✔
455
                                },
4✔
456
                        }
4✔
457
                        if err := confStream.Send(conf); err != nil {
4✔
458
                                return err
×
459
                        }
×
460

461
                // The transaction satisfying the request has been reorged out
462
                // of the chain, so we'll send an event describing it.
463
                case _, ok := <-confEvent.NegativeConf:
×
464
                        if !ok {
×
465
                                return chainntnfs.ErrChainNotifierShuttingDown
×
466
                        }
×
467

468
                        reorg := &ConfEvent{
×
469
                                Event: &ConfEvent_Reorg{Reorg: &Reorg{}},
×
470
                        }
×
471
                        if err := confStream.Send(reorg); err != nil {
×
472
                                return err
×
473
                        }
×
474

475
                // The transaction satisfying the request has confirmed and is
476
                // no longer under the risk of being reorged out of the chain,
477
                // so we can safely exit.
478
                case _, ok := <-confEvent.Done:
×
479
                        if !ok {
×
480
                                return chainntnfs.ErrChainNotifierShuttingDown
×
481
                        }
×
482

483
                        return nil
×
484

485
                // The response stream's context for whatever reason has been
486
                // closed. If context is closed by an exceeded deadline we will
487
                // return an error.
488
                case <-confStream.Context().Done():
4✔
489
                        if errors.Is(confStream.Context().Err(), context.Canceled) {
8✔
490
                                return nil
4✔
491
                        }
4✔
492
                        return confStream.Context().Err()
×
493

494
                // The server has been requested to shut down.
495
                case <-s.quit:
×
496
                        return ErrChainNotifierServerShuttingDown
×
497
                }
498
        }
499
}
500

501
// RegisterSpendNtfn is a synchronous response-streaming RPC that registers an
502
// intent for a client to be notification once a spend request has been spent by
503
// a transaction that has confirmed on-chain.
504
//
505
// A client can specify whether the spend request should be for a particular
506
// outpoint  or for an output script by specifying a zero outpoint.
507
//
508
// NOTE: This is part of the chainrpc.ChainNotifierService interface.
509
func (s *Server) RegisterSpendNtfn(in *SpendRequest,
510
        spendStream ChainNotifier_RegisterSpendNtfnServer) error {
4✔
511

4✔
512
        if !s.cfg.ChainNotifier.Started() {
4✔
513
                return ErrChainNotifierServerNotActive
×
514
        }
×
515

516
        // We'll start by reconstructing the RPC request into what the
517
        // underlying ChainNotifier expects.
518
        var op *wire.OutPoint
4✔
519
        if in.Outpoint != nil {
8✔
520
                var txid chainhash.Hash
4✔
521
                copy(txid[:], in.Outpoint.Hash)
4✔
522
                op = &wire.OutPoint{Hash: txid, Index: in.Outpoint.Index}
4✔
523
        }
4✔
524

525
        // We'll then register for the spend notification of the request.
526
        spendEvent, err := s.cfg.ChainNotifier.RegisterSpendNtfn(
4✔
527
                op, in.Script, in.HeightHint,
4✔
528
        )
4✔
529
        if err != nil {
8✔
530
                return err
4✔
531
        }
4✔
532
        defer spendEvent.Cancel()
4✔
533

4✔
534
        // With the request registered, we'll wait for its spend notification to
4✔
535
        // be dispatched.
4✔
536
        for {
8✔
537
                select {
4✔
538
                // A transaction that spends the given has confirmed on-chain.
539
                // We'll return an event to the caller indicating so that
540
                // includes the details of the spending transaction.
541
                case details, ok := <-spendEvent.Spend:
4✔
542
                        if !ok {
4✔
543
                                return chainntnfs.ErrChainNotifierShuttingDown
×
544
                        }
×
545

546
                        var rawSpendingTxBuf bytes.Buffer
4✔
547
                        err := details.SpendingTx.Serialize(&rawSpendingTxBuf)
4✔
548
                        if err != nil {
4✔
549
                                return err
×
550
                        }
×
551

552
                        rpcSpendDetails := &SpendDetails{
4✔
553
                                SpendingOutpoint: &Outpoint{
4✔
554
                                        Hash:  details.SpentOutPoint.Hash[:],
4✔
555
                                        Index: details.SpentOutPoint.Index,
4✔
556
                                },
4✔
557
                                RawSpendingTx:      rawSpendingTxBuf.Bytes(),
4✔
558
                                SpendingTxHash:     details.SpenderTxHash[:],
4✔
559
                                SpendingInputIndex: details.SpenderInputIndex,
4✔
560
                                SpendingHeight:     uint32(details.SpendingHeight),
4✔
561
                        }
4✔
562

4✔
563
                        spend := &SpendEvent{
4✔
564
                                Event: &SpendEvent_Spend{
4✔
565
                                        Spend: rpcSpendDetails,
4✔
566
                                },
4✔
567
                        }
4✔
568
                        if err := spendStream.Send(spend); err != nil {
4✔
569
                                return err
×
570
                        }
×
571

572
                // The spending transaction of the request has been reorged of
573
                // the chain. We'll return an event to the caller indicating so.
574
                case _, ok := <-spendEvent.Reorg:
×
575
                        if !ok {
×
576
                                return chainntnfs.ErrChainNotifierShuttingDown
×
577
                        }
×
578

579
                        reorg := &SpendEvent{
×
580
                                Event: &SpendEvent_Reorg{Reorg: &Reorg{}},
×
581
                        }
×
582
                        if err := spendStream.Send(reorg); err != nil {
×
583
                                return err
×
584
                        }
×
585

586
                // The spending transaction of the requests has confirmed
587
                // on-chain and is no longer under the risk of being reorged out
588
                // of the chain, so we can safely exit.
589
                case _, ok := <-spendEvent.Done:
×
590
                        if !ok {
×
591
                                return chainntnfs.ErrChainNotifierShuttingDown
×
592
                        }
×
593

594
                        return nil
×
595

596
                // The response stream's context for whatever reason has been
597
                // closed. If context is closed by an exceeded deadline we will
598
                // return an error.
599
                case <-spendStream.Context().Done():
4✔
600
                        if errors.Is(spendStream.Context().Err(), context.Canceled) {
8✔
601
                                return nil
4✔
602
                        }
4✔
603
                        return spendStream.Context().Err()
×
604

605
                // The server has been requested to shut down.
606
                case <-s.quit:
×
607
                        return ErrChainNotifierServerShuttingDown
×
608
                }
609
        }
610
}
611

612
// RegisterBlockEpochNtfn is a synchronous response-streaming RPC that registers
613
// an intent for a client to be notified of blocks in the chain. The stream will
614
// return a hash and height tuple of a block for each new/stale block in the
615
// chain. It is the client's responsibility to determine whether the tuple
616
// returned is for a new or stale block in the chain.
617
//
618
// A client can also request a historical backlog of blocks from a particular
619
// point. This allows clients to be idempotent by ensuring that they do not
620
// missing processing a single block within the chain.
621
//
622
// NOTE: This is part of the chainrpc.ChainNotifierService interface.
623
func (s *Server) RegisterBlockEpochNtfn(in *BlockEpoch,
624
        epochStream ChainNotifier_RegisterBlockEpochNtfnServer) error {
4✔
625

4✔
626
        if !s.cfg.ChainNotifier.Started() {
4✔
627
                return ErrChainNotifierServerNotActive
×
628
        }
×
629

630
        // We'll start by reconstructing the RPC request into what the
631
        // underlying ChainNotifier expects.
632
        var hash chainhash.Hash
4✔
633
        copy(hash[:], in.Hash)
4✔
634

4✔
635
        // If the request isn't for a zero hash and a zero height, then we
4✔
636
        // should deliver a backlog of notifications from the given block
4✔
637
        // (hash/height tuple) until tip, and continue delivering epochs for
4✔
638
        // new blocks.
4✔
639
        var blockEpoch *chainntnfs.BlockEpoch
4✔
640
        if hash != chainntnfs.ZeroHash && in.Height != 0 {
8✔
641
                blockEpoch = &chainntnfs.BlockEpoch{
4✔
642
                        Hash:   &hash,
4✔
643
                        Height: int32(in.Height),
4✔
644
                }
4✔
645
        }
4✔
646

647
        epochEvent, err := s.cfg.ChainNotifier.RegisterBlockEpochNtfn(blockEpoch)
4✔
648
        if err != nil {
4✔
649
                return err
×
650
        }
×
651
        defer epochEvent.Cancel()
4✔
652

4✔
653
        for {
8✔
654
                select {
4✔
655
                // A notification for a block has been received. This block can
656
                // either be a new block or stale.
657
                case blockEpoch, ok := <-epochEvent.Epochs:
4✔
658
                        if !ok {
4✔
659
                                return chainntnfs.ErrChainNotifierShuttingDown
×
660
                        }
×
661

662
                        epoch := &BlockEpoch{
4✔
663
                                Hash:   blockEpoch.Hash[:],
4✔
664
                                Height: uint32(blockEpoch.Height),
4✔
665
                        }
4✔
666
                        if err := epochStream.Send(epoch); err != nil {
4✔
667
                                return err
×
668
                        }
×
669

670
                // The response stream's context for whatever reason has been
671
                // closed. If context is closed by an exceeded deadline we will
672
                // return an error.
673
                case <-epochStream.Context().Done():
4✔
674
                        if errors.Is(epochStream.Context().Err(), context.Canceled) {
8✔
675
                                return nil
4✔
676
                        }
4✔
677
                        return epochStream.Context().Err()
×
678

679
                // The server has been requested to shut down.
680
                case <-s.quit:
×
681
                        return ErrChainNotifierServerShuttingDown
×
682
                }
683
        }
684
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc