• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12412879685

19 Dec 2024 12:40PM UTC coverage: 58.744% (+0.09%) from 58.653%
12412879685

Pull #8754

github

ViktorTigerstrom
itest: wrap deriveCustomScopeAccounts at 80 chars

This commit fixes that word wrapping for the deriveCustomScopeAccounts
function docs, and ensures that it wraps at 80 characters or less.
Pull Request #8754: Add `Outbound` Remote Signer implementation

1858 of 2816 new or added lines in 47 files covered. (65.98%)

267 existing lines in 51 files now uncovered.

136038 of 231578 relevant lines covered (58.74%)

19020.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.01
/lnrpc/chainrpc/chain_server.go
1
//go:build chainrpc
2
// +build chainrpc
3

4
package chainrpc
5

6
import (
7
        "bytes"
8
        "context"
9
        "errors"
10
        "os"
11
        "path/filepath"
12
        "sync"
13
        "sync/atomic"
14

15
        "github.com/btcsuite/btcd/chaincfg/chainhash"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
18
        "github.com/lightningnetwork/lnd/chainntnfs"
19
        "github.com/lightningnetwork/lnd/lnrpc"
20
        "github.com/lightningnetwork/lnd/macaroons"
21
        "google.golang.org/grpc"
22
        "gopkg.in/macaroon-bakery.v2/bakery"
23
)
24

25
const (
26
        // subServerName is the name of the RPC sub-server. We'll use this name
27
        // to register ourselves, and we also require that the main
28
        // SubServerConfigDispatcher instance recognize this as the name of the
29
        // config file that we need.
30
        subServerName = "ChainRPC"
31
)
32

33
var (
34
        // macaroonOps are the set of capabilities that our minted macaroon (if
35
        // it doesn't already exist) will have.
36
        macaroonOps = []bakery.Op{
37
                {
38
                        Entity: "onchain",
39
                        Action: "read",
40
                },
41
        }
42

43
        // macPermissions maps RPC calls to the permissions they require.
44
        macPermissions = map[string][]bakery.Op{
45
                "/chainrpc.ChainKit/GetBlock": {{
46
                        Entity: "onchain",
47
                        Action: "read",
48
                }},
49
                "/chainrpc.ChainKit/GetBlockHeader": {{
50
                        Entity: "onchain",
51
                        Action: "read",
52
                }},
53
                "/chainrpc.ChainKit/GetBestBlock": {{
54
                        Entity: "onchain",
55
                        Action: "read",
56
                }},
57
                "/chainrpc.ChainKit/GetBlockHash": {{
58
                        Entity: "onchain",
59
                        Action: "read",
60
                }},
61
                "/chainrpc.ChainNotifier/RegisterConfirmationsNtfn": {{
62
                        Entity: "onchain",
63
                        Action: "read",
64
                }},
65
                "/chainrpc.ChainNotifier/RegisterSpendNtfn": {{
66
                        Entity: "onchain",
67
                        Action: "read",
68
                }},
69
                "/chainrpc.ChainNotifier/RegisterBlockEpochNtfn": {{
70
                        Entity: "onchain",
71
                        Action: "read",
72
                }},
73
        }
74

75
        // DefaultChainNotifierMacFilename is the default name of the chain
76
        // notifier macaroon that we expect to find via a file handle within the
77
        // main configuration file in this package.
78
        DefaultChainNotifierMacFilename = "chainnotifier.macaroon"
79

80
        // ErrChainNotifierServerShuttingDown is an error returned when we are
81
        // waiting for a notification to arrive but the chain notifier server
82
        // has been shut down.
83
        ErrChainNotifierServerShuttingDown = errors.New("chain notifier RPC " +
84
                "subserver shutting down")
85

86
        // ErrChainNotifierServerNotActive indicates that the chain notifier hasn't
87
        // finished the startup process.
88
        ErrChainNotifierServerNotActive = errors.New("chain notifier RPC is " +
89
                "still in the process of starting")
90
)
91

92
// ServerShell is a shell struct holding a reference to the actual sub-server.
93
// It is used to register the gRPC sub-server with the root server before we
94
// have the necessary dependencies to populate the actual sub-server.
95
type ServerShell struct {
96
        ChainKitServer
97
        ChainNotifierServer
98
}
99

100
// Server is a sub-server of the main RPC server. It serves the chainkit RPC
101
// and chain notifier RPC. This RPC sub-server allows external callers to access
102
// the full chainkit and chain notifier capabilities of lnd. This allows callers
103
// to create custom protocols, external to lnd, even backed by multiple distinct
104
// lnd across independent failure domains.
105
type Server struct {
106
        injected int32 // To be used atomically.
107

108
        // Required by the grpc-gateway/v2 library for forward compatibility.
109
        UnimplementedChainNotifierServer
110
        UnimplementedChainKitServer
111

112
        stopped sync.Once
113

114
        cfg Config
115

116
        quit chan struct{}
117
}
118

119
// New returns a new instance of the chainrpc ChainNotifier sub-server. We also
120
// return the set of permissions for the macaroons that we may create within
121
// this method. If the macaroons we need aren't found in the filepath, then
122
// we'll create them on start up. If we're unable to locate, or create the
123
// macaroons we need, then we'll return with an error.
124
func New() (*Server, lnrpc.MacaroonPerms, error) {
3✔
125
        return &Server{
3✔
126
                cfg:  Config{},
3✔
127
                quit: make(chan struct{}),
3✔
128
        }, macPermissions, nil
3✔
129
}
3✔
130

131
// Compile-time checks to ensure that Server fully implements the
132
// ChainNotifierServer gRPC service, ChainKitServer gRPC service, and
133
// lnrpc.SubServer interface.
134
var _ ChainNotifierServer = (*Server)(nil)
135
var _ ChainKitServer = (*Server)(nil)
136
var _ lnrpc.SubServer = (*Server)(nil)
137

138
// Stop signals any active goroutines for a graceful closure.
139
//
140
// NOTE: This is part of the lnrpc.SubServer interface.
141
func (s *Server) Stop() error {
3✔
142
        s.stopped.Do(func() {
6✔
143
                close(s.quit)
3✔
144
        })
3✔
145

146
        return nil
3✔
147
}
148

149
// InjectDependencies populates the sub-server's dependencies. If the
150
// finalizeDependencies boolean is true, then the sub-server will finalize its
151
// dependencies and return an error if any required dependencies are missing.
152
//
153
// NOTE: This is part of the lnrpc.SubServer interface.
154
func (s *Server) InjectDependencies(
155
        configRegistry lnrpc.SubServerConfigDispatcher,
156
        finalizeDependencies bool) error {
3✔
157

3✔
158
        if finalizeDependencies && atomic.AddInt32(&s.injected, 1) != 1 {
3✔
NEW
159
                return lnrpc.ErrDependenciesFinalized
×
NEW
160
        }
×
161

162
        cfg, err := getConfig(configRegistry, finalizeDependencies)
3✔
163
        if err != nil {
3✔
NEW
164
                return err
×
NEW
165
        }
×
166

167
        if finalizeDependencies {
6✔
168
                s.cfg = *cfg
3✔
169

3✔
170
                return nil
3✔
171
        }
3✔
172

173
        // If the path of the chain notifier macaroon wasn't generated, then
174
        // we'll assume that it's found at the default network directory.
UNCOV
175
        if cfg.ChainNotifierMacPath == "" {
×
UNCOV
176
                cfg.ChainNotifierMacPath = filepath.Join(
×
UNCOV
177
                        cfg.NetworkDir, DefaultChainNotifierMacFilename,
×
UNCOV
178
                )
×
UNCOV
179
        }
×
180

181
        // Now that we know the full path of the chain notifier macaroon, we can
182
        // check to see if we need to create it or not. If stateless_init is set
183
        // then we don't write the macaroons.
UNCOV
184
        macFilePath := cfg.ChainNotifierMacPath
×
UNCOV
185
        if cfg.MacService != nil && !cfg.MacService.StatelessInit &&
×
UNCOV
186
                !lnrpc.FileExists(macFilePath) {
×
UNCOV
187

×
NEW
188
                log.Infof("Baking macaroons for ChainNotifier RPC Server "+
×
NEW
189
                        "at: %v", macFilePath)
×
UNCOV
190

×
UNCOV
191
                // At this point, we know that the chain notifier macaroon
×
UNCOV
192
                // doesn't yet, exist, so we need to create it with the help of
×
UNCOV
193
                // the main macaroon service.
×
UNCOV
194
                chainNotifierMac, err := cfg.MacService.NewMacaroon(
×
UNCOV
195
                        context.Background(), macaroons.DefaultRootKeyID,
×
UNCOV
196
                        macaroonOps...,
×
UNCOV
197
                )
×
UNCOV
198
                if err != nil {
×
NEW
199
                        return err
×
200
                }
×
UNCOV
201
                chainNotifierMacBytes, err := chainNotifierMac.M().MarshalBinary()
×
UNCOV
202
                if err != nil {
×
NEW
203
                        return err
×
204
                }
×
UNCOV
205
                err = os.WriteFile(macFilePath, chainNotifierMacBytes, 0644)
×
UNCOV
206
                if err != nil {
×
207
                        _ = os.Remove(macFilePath)
×
NEW
208
                        return err
×
209
                }
×
210
        }
211

NEW
212
        s.cfg = *cfg
×
UNCOV
213

×
UNCOV
214
        return nil
×
215
}
216

217
// Name returns a unique string representation of the sub-server. This can be
218
// used to identify the sub-server and also de-duplicate them.
219
//
220
// NOTE: This is part of the lnrpc.SubServer interface.
221
func (s *Server) Name() string {
3✔
222
        return subServerName
3✔
223
}
3✔
224

225
// RegisterWithRootServer will be called by the root gRPC server to direct a RPC
226
// sub-server to register itself with the main gRPC root server. Until this is
227
// called, each sub-server won't be able to have requests routed towards it.
228
//
229
// NOTE: This is part of the lnrpc.GrpcHandler interface.
230
func (r *ServerShell) RegisterWithRootServer(grpcServer *grpc.Server) error {
3✔
231
        // We make sure that we register it with the main gRPC server to ensure
3✔
232
        // all our methods are routed properly.
3✔
233
        RegisterChainNotifierServer(grpcServer, r)
3✔
234
        log.Debug("ChainNotifier RPC server successfully register with root " +
3✔
235
                "gRPC server")
3✔
236

3✔
237
        RegisterChainKitServer(grpcServer, r)
3✔
238
        log.Debug("ChainKit RPC server successfully register with root gRPC " +
3✔
239
                "server")
3✔
240

3✔
241
        return nil
3✔
242
}
3✔
243

244
// RegisterWithRestServer will be called by the root REST mux to direct a sub
245
// RPC server to register itself with the main REST mux server. Until this is
246
// called, each sub-server won't be able to have requests routed towards it.
247
//
248
// NOTE: This is part of the lnrpc.GrpcHandler interface.
249
func (r *ServerShell) RegisterWithRestServer(ctx context.Context,
250
        mux *runtime.ServeMux, dest string, opts []grpc.DialOption) error {
3✔
251

3✔
252
        // We make sure that we register it with the main REST server to ensure
3✔
253
        // all our methods are routed properly.
3✔
254
        err := RegisterChainNotifierHandlerFromEndpoint(ctx, mux, dest, opts)
3✔
255
        if err != nil {
3✔
256
                log.Errorf("Could not register ChainNotifier REST server "+
×
257
                        "with root REST server: %v", err)
×
258
                return err
×
259
        }
×
260

261
        log.Debugf("ChainNotifier REST server successfully registered with " +
3✔
262
                "root REST server")
3✔
263

3✔
264
        // Register chainkit with the main REST server to ensure all our methods
3✔
265
        // are routed properly.
3✔
266
        err = RegisterChainKitHandlerFromEndpoint(ctx, mux, dest, opts)
3✔
267
        if err != nil {
3✔
268
                log.Errorf("Could not register ChainKit REST server with root "+
×
269
                        "REST server: %v", err)
×
270
                return err
×
271
        }
×
272
        log.Debugf("ChainKit REST server successfully registered with root " +
3✔
273
                "REST server")
3✔
274

3✔
275
        return nil
3✔
276
}
277

278
// CreateSubServer creates an instance of the sub-server, and returns the
279
// macaroon permissions that the sub-server wishes to pass on to the root server
280
// for all methods routed towards it.
281
//
282
// NOTE: This is part of the lnrpc.GrpcHandler interface.
283
func (r *ServerShell) CreateSubServer() (
284
        lnrpc.SubServer, lnrpc.MacaroonPerms, error) {
3✔
285

3✔
286
        subServer, macPermissions, err := New()
3✔
287
        if err != nil {
3✔
288
                return nil, nil, err
×
289
        }
×
290

291
        r.ChainNotifierServer = subServer
3✔
292
        r.ChainKitServer = subServer
3✔
293
        return subServer, macPermissions, nil
3✔
294
}
295

296
// GetBlock returns a block given the corresponding block hash.
297
func (s *Server) GetBlock(_ context.Context,
298
        in *GetBlockRequest) (*GetBlockResponse, error) {
3✔
299

3✔
300
        // We'll start by reconstructing the RPC request into what the
3✔
301
        // underlying chain functionality expects.
3✔
302
        var blockHash chainhash.Hash
3✔
303
        copy(blockHash[:], in.BlockHash)
3✔
304

3✔
305
        block, err := s.cfg.Chain.GetBlock(&blockHash)
3✔
306
        if err != nil {
3✔
307
                return nil, err
×
308
        }
×
309

310
        // Serialize block for RPC response.
311
        var blockBuf bytes.Buffer
3✔
312
        err = block.Serialize(&blockBuf)
3✔
313
        if err != nil {
3✔
314
                return nil, err
×
315
        }
×
316
        rawBlock := blockBuf.Bytes()
3✔
317

3✔
318
        return &GetBlockResponse{RawBlock: rawBlock}, nil
3✔
319
}
320

321
// GetBlockHeader returns a block header given the corresponding block hash.
322
func (s *Server) GetBlockHeader(_ context.Context,
323
        in *GetBlockHeaderRequest) (*GetBlockHeaderResponse, error) {
3✔
324

3✔
325
        // We'll start by reconstructing the RPC request into what the
3✔
326
        // underlying chain functionality expects.
3✔
327
        var blockHash chainhash.Hash
3✔
328
        copy(blockHash[:], in.BlockHash)
3✔
329

3✔
330
        blockHeader, err := s.cfg.Chain.GetBlockHeader(&blockHash)
3✔
331
        if err != nil {
3✔
332
                return nil, err
×
333
        }
×
334

335
        // Serialize block header for RPC response.
336
        var headerBuf bytes.Buffer
3✔
337
        err = blockHeader.Serialize(&headerBuf)
3✔
338
        if err != nil {
3✔
339
                return nil, err
×
340
        }
×
341
        rawHeader := headerBuf.Bytes()
3✔
342

3✔
343
        return &GetBlockHeaderResponse{RawBlockHeader: rawHeader}, nil
3✔
344
}
345

346
// GetBestBlock returns the latest block hash and current height of the valid
347
// most-work chain.
348
func (s *Server) GetBestBlock(_ context.Context,
349
        _ *GetBestBlockRequest) (*GetBestBlockResponse, error) {
3✔
350

3✔
351
        blockHash, blockHeight, err := s.cfg.Chain.GetBestBlock()
3✔
352
        if err != nil {
3✔
353
                return nil, err
×
354
        }
×
355

356
        return &GetBestBlockResponse{
3✔
357
                BlockHash:   blockHash[:],
3✔
358
                BlockHeight: blockHeight,
3✔
359
        }, nil
3✔
360
}
361

362
// GetBlockHash returns the hash of the block in the best blockchain
363
// at the given height.
364
func (s *Server) GetBlockHash(_ context.Context,
365
        req *GetBlockHashRequest) (*GetBlockHashResponse, error) {
3✔
366

3✔
367
        blockHash, err := s.cfg.Chain.GetBlockHash(req.BlockHeight)
3✔
368
        if err != nil {
3✔
369
                return nil, err
×
370
        }
×
371

372
        return &GetBlockHashResponse{
3✔
373
                BlockHash: blockHash[:],
3✔
374
        }, nil
3✔
375
}
376

377
// RegisterConfirmationsNtfn is a synchronous response-streaming RPC that
378
// registers an intent for a client to be notified once a confirmation request
379
// has reached its required number of confirmations on-chain.
380
//
381
// A client can specify whether the confirmation request should be for a
382
// particular transaction by its hash or for an output script by specifying a
383
// zero hash.
384
//
385
// NOTE: This is part of the chainrpc.ChainNotifierService interface.
386
func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest,
387
        confStream ChainNotifier_RegisterConfirmationsNtfnServer) error {
3✔
388

3✔
389
        if !s.cfg.ChainNotifier.Started() {
3✔
390
                return ErrChainNotifierServerNotActive
×
391
        }
×
392

393
        // We'll start by reconstructing the RPC request into what the
394
        // underlying ChainNotifier expects.
395
        var txid chainhash.Hash
3✔
396
        copy(txid[:], in.Txid)
3✔
397

3✔
398
        var opts []chainntnfs.NotifierOption
3✔
399
        if in.IncludeBlock {
6✔
400
                opts = append(opts, chainntnfs.WithIncludeBlock())
3✔
401
        }
3✔
402

403
        // We'll then register for the spend notification of the request.
404
        confEvent, err := s.cfg.ChainNotifier.RegisterConfirmationsNtfn(
3✔
405
                &txid, in.Script, in.NumConfs, in.HeightHint, opts...,
3✔
406
        )
3✔
407
        if err != nil {
3✔
408
                return err
×
409
        }
×
410
        defer confEvent.Cancel()
3✔
411

3✔
412
        // With the request registered, we'll wait for its spend notification to
3✔
413
        // be dispatched.
3✔
414
        for {
6✔
415
                select {
3✔
416
                // The transaction satisfying the request has confirmed on-chain
417
                // and reached its required number of confirmations. We'll
418
                // dispatch an event to the caller indicating so.
419
                case details, ok := <-confEvent.Confirmed:
3✔
420
                        if !ok {
3✔
421
                                return chainntnfs.ErrChainNotifierShuttingDown
×
422
                        }
×
423

424
                        var rawTxBuf bytes.Buffer
3✔
425
                        err := details.Tx.Serialize(&rawTxBuf)
3✔
426
                        if err != nil {
3✔
427
                                return err
×
428
                        }
×
429

430
                        // If the block was included (should only be there if
431
                        // IncludeBlock is true), then we'll encode the bytes
432
                        // to send with the response.
433
                        var blockBytes []byte
3✔
434
                        if details.Block != nil {
6✔
435
                                var blockBuf bytes.Buffer
3✔
436
                                err := details.Block.Serialize(&blockBuf)
3✔
437
                                if err != nil {
3✔
438
                                        return err
×
439
                                }
×
440

441
                                blockBytes = blockBuf.Bytes()
3✔
442
                        }
443

444
                        rpcConfDetails := &ConfDetails{
3✔
445
                                RawTx:       rawTxBuf.Bytes(),
3✔
446
                                BlockHash:   details.BlockHash[:],
3✔
447
                                BlockHeight: details.BlockHeight,
3✔
448
                                TxIndex:     details.TxIndex,
3✔
449
                                RawBlock:    blockBytes,
3✔
450
                        }
3✔
451

3✔
452
                        conf := &ConfEvent{
3✔
453
                                Event: &ConfEvent_Conf{
3✔
454
                                        Conf: rpcConfDetails,
3✔
455
                                },
3✔
456
                        }
3✔
457
                        if err := confStream.Send(conf); err != nil {
3✔
458
                                return err
×
459
                        }
×
460

461
                // The transaction satisfying the request has been reorged out
462
                // of the chain, so we'll send an event describing it.
463
                case _, ok := <-confEvent.NegativeConf:
×
464
                        if !ok {
×
465
                                return chainntnfs.ErrChainNotifierShuttingDown
×
466
                        }
×
467

468
                        reorg := &ConfEvent{
×
469
                                Event: &ConfEvent_Reorg{Reorg: &Reorg{}},
×
470
                        }
×
471
                        if err := confStream.Send(reorg); err != nil {
×
472
                                return err
×
473
                        }
×
474

475
                // The transaction satisfying the request has confirmed and is
476
                // no longer under the risk of being reorged out of the chain,
477
                // so we can safely exit.
478
                case _, ok := <-confEvent.Done:
×
479
                        if !ok {
×
480
                                return chainntnfs.ErrChainNotifierShuttingDown
×
481
                        }
×
482

483
                        return nil
×
484

485
                // The response stream's context for whatever reason has been
486
                // closed. If context is closed by an exceeded deadline we will
487
                // return an error.
488
                case <-confStream.Context().Done():
3✔
489
                        if errors.Is(confStream.Context().Err(), context.Canceled) {
6✔
490
                                return nil
3✔
491
                        }
3✔
492
                        return confStream.Context().Err()
×
493

494
                // The server has been requested to shut down.
495
                case <-s.quit:
×
496
                        return ErrChainNotifierServerShuttingDown
×
497
                }
498
        }
499
}
500

501
// RegisterSpendNtfn is a synchronous response-streaming RPC that registers an
502
// intent for a client to be notification once a spend request has been spent by
503
// a transaction that has confirmed on-chain.
504
//
505
// A client can specify whether the spend request should be for a particular
506
// outpoint  or for an output script by specifying a zero outpoint.
507
//
508
// NOTE: This is part of the chainrpc.ChainNotifierService interface.
509
func (s *Server) RegisterSpendNtfn(in *SpendRequest,
510
        spendStream ChainNotifier_RegisterSpendNtfnServer) error {
3✔
511

3✔
512
        if !s.cfg.ChainNotifier.Started() {
3✔
513
                return ErrChainNotifierServerNotActive
×
514
        }
×
515

516
        // We'll start by reconstructing the RPC request into what the
517
        // underlying ChainNotifier expects.
518
        var op *wire.OutPoint
3✔
519
        if in.Outpoint != nil {
6✔
520
                var txid chainhash.Hash
3✔
521
                copy(txid[:], in.Outpoint.Hash)
3✔
522
                op = &wire.OutPoint{Hash: txid, Index: in.Outpoint.Index}
3✔
523
        }
3✔
524

525
        // We'll then register for the spend notification of the request.
526
        spendEvent, err := s.cfg.ChainNotifier.RegisterSpendNtfn(
3✔
527
                op, in.Script, in.HeightHint,
3✔
528
        )
3✔
529
        if err != nil {
6✔
530
                return err
3✔
531
        }
3✔
532
        defer spendEvent.Cancel()
3✔
533

3✔
534
        // With the request registered, we'll wait for its spend notification to
3✔
535
        // be dispatched.
3✔
536
        for {
6✔
537
                select {
3✔
538
                // A transaction that spends the given has confirmed on-chain.
539
                // We'll return an event to the caller indicating so that
540
                // includes the details of the spending transaction.
541
                case details, ok := <-spendEvent.Spend:
3✔
542
                        if !ok {
3✔
543
                                return chainntnfs.ErrChainNotifierShuttingDown
×
544
                        }
×
545

546
                        var rawSpendingTxBuf bytes.Buffer
3✔
547
                        err := details.SpendingTx.Serialize(&rawSpendingTxBuf)
3✔
548
                        if err != nil {
3✔
549
                                return err
×
550
                        }
×
551

552
                        rpcSpendDetails := &SpendDetails{
3✔
553
                                SpendingOutpoint: &Outpoint{
3✔
554
                                        Hash:  details.SpentOutPoint.Hash[:],
3✔
555
                                        Index: details.SpentOutPoint.Index,
3✔
556
                                },
3✔
557
                                RawSpendingTx:      rawSpendingTxBuf.Bytes(),
3✔
558
                                SpendingTxHash:     details.SpenderTxHash[:],
3✔
559
                                SpendingInputIndex: details.SpenderInputIndex,
3✔
560
                                SpendingHeight:     uint32(details.SpendingHeight),
3✔
561
                        }
3✔
562

3✔
563
                        spend := &SpendEvent{
3✔
564
                                Event: &SpendEvent_Spend{
3✔
565
                                        Spend: rpcSpendDetails,
3✔
566
                                },
3✔
567
                        }
3✔
568
                        if err := spendStream.Send(spend); err != nil {
3✔
569
                                return err
×
570
                        }
×
571

572
                // The spending transaction of the request has been reorged of
573
                // the chain. We'll return an event to the caller indicating so.
574
                case _, ok := <-spendEvent.Reorg:
×
575
                        if !ok {
×
576
                                return chainntnfs.ErrChainNotifierShuttingDown
×
577
                        }
×
578

579
                        reorg := &SpendEvent{
×
580
                                Event: &SpendEvent_Reorg{Reorg: &Reorg{}},
×
581
                        }
×
582
                        if err := spendStream.Send(reorg); err != nil {
×
583
                                return err
×
584
                        }
×
585

586
                // The spending transaction of the requests has confirmed
587
                // on-chain and is no longer under the risk of being reorged out
588
                // of the chain, so we can safely exit.
589
                case _, ok := <-spendEvent.Done:
×
590
                        if !ok {
×
591
                                return chainntnfs.ErrChainNotifierShuttingDown
×
592
                        }
×
593

594
                        return nil
×
595

596
                // The response stream's context for whatever reason has been
597
                // closed. If context is closed by an exceeded deadline we will
598
                // return an error.
599
                case <-spendStream.Context().Done():
3✔
600
                        if errors.Is(spendStream.Context().Err(), context.Canceled) {
6✔
601
                                return nil
3✔
602
                        }
3✔
603
                        return spendStream.Context().Err()
×
604

605
                // The server has been requested to shut down.
606
                case <-s.quit:
×
607
                        return ErrChainNotifierServerShuttingDown
×
608
                }
609
        }
610
}
611

612
// RegisterBlockEpochNtfn is a synchronous response-streaming RPC that registers
613
// an intent for a client to be notified of blocks in the chain. The stream will
614
// return a hash and height tuple of a block for each new/stale block in the
615
// chain. It is the client's responsibility to determine whether the tuple
616
// returned is for a new or stale block in the chain.
617
//
618
// A client can also request a historical backlog of blocks from a particular
619
// point. This allows clients to be idempotent by ensuring that they do not
620
// missing processing a single block within the chain.
621
//
622
// NOTE: This is part of the chainrpc.ChainNotifierService interface.
623
func (s *Server) RegisterBlockEpochNtfn(in *BlockEpoch,
624
        epochStream ChainNotifier_RegisterBlockEpochNtfnServer) error {
3✔
625

3✔
626
        if !s.cfg.ChainNotifier.Started() {
3✔
627
                return ErrChainNotifierServerNotActive
×
628
        }
×
629

630
        // We'll start by reconstructing the RPC request into what the
631
        // underlying ChainNotifier expects.
632
        var hash chainhash.Hash
3✔
633
        copy(hash[:], in.Hash)
3✔
634

3✔
635
        // If the request isn't for a zero hash and a zero height, then we
3✔
636
        // should deliver a backlog of notifications from the given block
3✔
637
        // (hash/height tuple) until tip, and continue delivering epochs for
3✔
638
        // new blocks.
3✔
639
        var blockEpoch *chainntnfs.BlockEpoch
3✔
640
        if hash != chainntnfs.ZeroHash && in.Height != 0 {
6✔
641
                blockEpoch = &chainntnfs.BlockEpoch{
3✔
642
                        Hash:   &hash,
3✔
643
                        Height: int32(in.Height),
3✔
644
                }
3✔
645
        }
3✔
646

647
        epochEvent, err := s.cfg.ChainNotifier.RegisterBlockEpochNtfn(blockEpoch)
3✔
648
        if err != nil {
3✔
649
                return err
×
650
        }
×
651
        defer epochEvent.Cancel()
3✔
652

3✔
653
        for {
6✔
654
                select {
3✔
655
                // A notification for a block has been received. This block can
656
                // either be a new block or stale.
657
                case blockEpoch, ok := <-epochEvent.Epochs:
3✔
658
                        if !ok {
3✔
659
                                return chainntnfs.ErrChainNotifierShuttingDown
×
660
                        }
×
661

662
                        epoch := &BlockEpoch{
3✔
663
                                Hash:   blockEpoch.Hash[:],
3✔
664
                                Height: uint32(blockEpoch.Height),
3✔
665
                        }
3✔
666
                        if err := epochStream.Send(epoch); err != nil {
3✔
667
                                return err
×
668
                        }
×
669

670
                // The response stream's context for whatever reason has been
671
                // closed. If context is closed by an exceeded deadline we will
672
                // return an error.
673
                case <-epochStream.Context().Done():
3✔
674
                        if errors.Is(epochStream.Context().Err(), context.Canceled) {
6✔
675
                                return nil
3✔
676
                        }
3✔
677
                        return epochStream.Context().Err()
×
678

679
                // The server has been requested to shut down.
680
                case <-s.quit:
×
681
                        return ErrChainNotifierServerShuttingDown
×
682
                }
683
        }
684
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc