• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17190810624

24 Aug 2025 03:58PM UTC coverage: 66.74% (+9.4%) from 57.321%
17190810624

Pull #10167

github

web-flow
Merge 33cec4f6a into 0c2f045f5
Pull Request #10167: multi: bump Go to 1.24.6

6 of 40 new or added lines in 10 files covered. (15.0%)

12 existing lines in 6 files now uncovered.

135947 of 203696 relevant lines covered (66.74%)

21470.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/lntest/node/harness_node.go
1
package node
2

3
import (
4
        "bytes"
5
        "context"
6
        "crypto/rand"
7
        "encoding/hex"
8
        "encoding/json"
9
        "fmt"
10
        "io"
11
        "os"
12
        "os/exec"
13
        "path/filepath"
14
        "strings"
15
        "testing"
16
        "time"
17

18
        "github.com/jackc/pgx/v4/pgxpool"
19
        "github.com/lightningnetwork/lnd"
20
        "github.com/lightningnetwork/lnd/lnrpc"
21
        "github.com/lightningnetwork/lnd/lntest/rpc"
22
        "github.com/lightningnetwork/lnd/lntest/wait"
23
        "github.com/lightningnetwork/lnd/macaroons"
24
        "google.golang.org/grpc"
25
        "google.golang.org/grpc/codes"
26
        "google.golang.org/grpc/credentials"
27
        "google.golang.org/grpc/status"
28
        "gopkg.in/macaroon.v2"
29
)
30

31
const (
32
        // logPubKeyBytes is the number of bytes of the node's PubKey that will
33
        // be appended to the log file name. The whole PubKey is too long and
34
        // not really necessary to quickly identify what node produced which
35
        // log file.
36
        logPubKeyBytes = 4
37

38
        // trickleDelay is the amount of time in milliseconds between each
39
        // release of announcements by AuthenticatedGossiper to the network.
40
        trickleDelay = 50
41

42
        postgresDsn = "postgres://postgres:postgres@localhost:" +
43
                "6432/%s?sslmode=disable"
44

45
        // commitInterval specifies the maximum interval the graph database
46
        // will wait between attempting to flush a batch of modifications to
47
        // disk(db.batch-commit-interval).
48
        commitInterval = 10 * time.Millisecond
49
)
50

51
// HarnessNode represents an instance of lnd running within our test network
52
// harness. It's responsible for managing the lnd process, grpc connection, and
53
// wallet auth. A HarnessNode is built upon its rpc clients, represented in
54
// `HarnessRPC`. It also has a `State` which holds its internal state, and a
55
// `Watcher` that keeps track of its topology updates.
56
type HarnessNode struct {
57
        *testing.T
58

59
        // Cfg holds the config values for the node.
60
        Cfg *BaseNodeConfig
61

62
        // RPC holds a list of RPC clients.
63
        RPC *rpc.HarnessRPC
64

65
        // State records the current state of the node.
66
        State *State
67

68
        // Watcher watches the node's topology updates.
69
        Watcher *nodeWatcher
70

71
        // PubKey is the serialized compressed identity public key of the node.
72
        // This field will only be populated once the node itself has been
73
        // started via the start() method.
74
        PubKey    [33]byte
75
        PubKeyStr string
76

77
        // conn is the underlying connection to the grpc endpoint of the node.
78
        conn *grpc.ClientConn
79

80
        // runCtx is a context with cancel method. It's used to signal when the
81
        // node needs to quit, and used as the parent context when spawning
82
        // children contexts for RPC requests.
83
        runCtx context.Context //nolint:containedctx
84
        cancel context.CancelFunc
85

86
        // filename is the log file's name.
87
        filename string
88

89
        cmd     *exec.Cmd
90
        logFile *os.File
91
}
92

93
// NewHarnessNode creates a new test lightning node instance from the passed
94
// config.
95
func NewHarnessNode(t *testing.T, cfg *BaseNodeConfig) (*HarnessNode, error) {
×
96
        if err := cfg.GenBaseDir(); err != nil {
×
97
                return nil, err
×
98
        }
×
99

100
        cfg.DataDir = filepath.Join(cfg.BaseDir, "data")
×
101
        cfg.LogDir = filepath.Join(cfg.BaseDir, "logs")
×
102
        cfg.TLSCertPath = filepath.Join(cfg.BaseDir, "tls.cert")
×
103
        cfg.TLSKeyPath = filepath.Join(cfg.BaseDir, "tls.key")
×
104

×
105
        networkDir := filepath.Join(
×
106
                cfg.DataDir, "chain", lnd.BitcoinChainName, cfg.NetParams.Name,
×
107
        )
×
108
        cfg.AdminMacPath = filepath.Join(networkDir, "admin.macaroon")
×
109
        cfg.ReadMacPath = filepath.Join(networkDir, "readonly.macaroon")
×
110
        cfg.InvoiceMacPath = filepath.Join(networkDir, "invoice.macaroon")
×
111

×
112
        cfg.GenerateListeningPorts()
×
113

×
114
        // Create temporary database.
×
115
        var dbName string
×
116
        if cfg.DBBackend == BackendPostgres {
×
117
                var err error
×
NEW
118
                dbName, err = createTempPgDB(t.Context())
×
119
                if err != nil {
×
120
                        return nil, err
×
121
                }
×
122
                cfg.PostgresDsn = postgresDatabaseDsn(dbName)
×
123
        }
124

125
        cfg.OriginalExtraArgs = cfg.ExtraArgs
×
126
        cfg.postgresDBName = dbName
×
127

×
128
        return &HarnessNode{
×
129
                T:   t,
×
130
                Cfg: cfg,
×
131
        }, nil
×
132
}
133

134
// Initialize creates a list of new RPC clients using the passed connection,
135
// initializes the node's internal state and creates a topology watcher.
136
func (hn *HarnessNode) Initialize(c *grpc.ClientConn) {
×
137
        hn.conn = c
×
138

×
139
        // Init all the rpc clients.
×
140
        hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, c, hn.Name())
×
141

×
142
        // Init the node's state.
×
143
        //
×
144
        // If we already have a state, it means we are restarting the node and
×
145
        // we will only reset its internal states. Otherwise we'll create a new
×
146
        // state.
×
147
        if hn.State != nil {
×
148
                hn.State.resetEphermalStates(hn.RPC)
×
149
        } else {
×
150
                hn.State = newState(hn.RPC)
×
151
        }
×
152

153
        // Init the topology watcher.
154
        hn.Watcher = newNodeWatcher(hn.RPC, hn.State)
×
155
}
156

157
// Name returns the name of this node set during initialization.
158
func (hn *HarnessNode) Name() string {
×
159
        return hn.Cfg.Name
×
160
}
×
161

162
// UpdateState updates the node's internal state.
163
func (hn *HarnessNode) UpdateState() {
×
164
        hn.State.updateState()
×
165
}
×
166

167
// String gives the internal state of the node which is useful for debugging.
168
func (hn *HarnessNode) String() string {
×
169
        type nodeCfg struct {
×
170
                LogFilenamePrefix string
×
171
                ExtraArgs         []string
×
172
                SkipUnlock        bool
×
173
                Password          []byte
×
174
                P2PPort           int
×
175
                RPCPort           int
×
176
                RESTPort          int
×
177
                AcceptKeySend     bool
×
178
                FeeURL            string
×
179
        }
×
180

×
181
        nodeState := struct {
×
182
                NodeID  uint32
×
183
                Name    string
×
184
                PubKey  string
×
185
                State   *State
×
186
                NodeCfg nodeCfg
×
187
        }{
×
188
                NodeID: hn.Cfg.NodeID,
×
189
                Name:   hn.Cfg.Name,
×
190
                PubKey: hn.PubKeyStr,
×
191
                State:  hn.State,
×
192
                NodeCfg: nodeCfg{
×
193
                        SkipUnlock:        hn.Cfg.SkipUnlock,
×
194
                        Password:          hn.Cfg.Password,
×
195
                        LogFilenamePrefix: hn.Cfg.LogFilenamePrefix,
×
196
                        ExtraArgs:         hn.Cfg.ExtraArgs,
×
197
                        P2PPort:           hn.Cfg.P2PPort,
×
198
                        RPCPort:           hn.Cfg.RPCPort,
×
199
                        RESTPort:          hn.Cfg.RESTPort,
×
200
                },
×
201
        }
×
202

×
203
        stateBytes, err := json.MarshalIndent(nodeState, "", "\t")
×
204
        if err != nil {
×
205
                return fmt.Sprintf("\n encode node state with err: %v", err)
×
206
        }
×
207

208
        return fmt.Sprintf("\nnode state: %s", stateBytes)
×
209
}
210

211
// WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START".
212
func (hn *HarnessNode) WaitUntilStarted() error {
×
213
        return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
×
214
                return s != lnrpc.WalletState_WAITING_TO_START
×
215
        })
×
216
}
217

218
// WaitUntilServerActive waits until the lnd daemon is fully started.
219
func (hn *HarnessNode) WaitUntilServerActive() error {
×
220
        return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
×
221
                return s == lnrpc.WalletState_SERVER_ACTIVE
×
222
        })
×
223
}
224

225
// WaitUntilLeader attempts to finish the start procedure by initiating an RPC
226
// connection and setting up the wallet unlocker client. This is needed when
227
// a node that has recently been started was waiting to become the leader and
228
// we're at the point when we expect that it is the leader now (awaiting
229
// unlock).
230
func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error {
×
231
        var (
×
232
                conn    *grpc.ClientConn
×
233
                connErr error
×
234
        )
×
235

×
236
        if err := wait.NoError(func() error {
×
237
                conn, connErr = hn.ConnectRPCWithMacaroon(nil)
×
238
                return connErr
×
239
        }, timeout); err != nil {
×
240
                return err
×
241
        }
×
242

243
        // Since the conn is not authed, only the `WalletUnlocker` and `State`
244
        // clients can be inited from this conn.
245
        hn.conn = conn
×
246
        hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, conn, hn.Name())
×
247

×
248
        // Wait till the server is starting.
×
249
        return hn.WaitUntilStarted()
×
250
}
251

252
// Unlock attempts to unlock the wallet of the target HarnessNode. This method
253
// should be called after the restart of a HarnessNode that was created with a
254
// seed+password. Once this method returns, the HarnessNode will be ready to
255
// accept normal gRPC requests and harness command.
256
func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error {
×
257
        // Otherwise, we'll need to unlock the node before it's able to start
×
258
        // up properly.
×
259
        hn.RPC.UnlockWallet(unlockReq)
×
260

×
261
        // Now that the wallet has been unlocked, we'll wait for the RPC client
×
262
        // to be ready, then establish the normal gRPC connection.
×
263
        return hn.InitNode(nil)
×
264
}
×
265

266
// AddToLogf adds a line of choice to the node's logfile. This is useful
267
// to interleave test output with output from the node.
268
func (hn *HarnessNode) AddToLogf(format string, a ...interface{}) {
×
269
        // If this node was not set up with a log file, just return early.
×
270
        if hn.logFile == nil {
×
271
                return
×
272
        }
×
273

274
        desc := fmt.Sprintf("itest: %s\n", fmt.Sprintf(format, a...))
×
275
        if _, err := hn.logFile.WriteString(desc); err != nil {
×
276
                hn.printErrf("write to log err: %v", err)
×
277
        }
×
278
}
279

280
// ReadMacaroon waits a given duration for the macaroon file to be created. If
281
// the file is readable within the timeout, its content is de-serialized as a
282
// macaroon and returned.
283
func (hn *HarnessNode) ReadMacaroon(macPath string, timeout time.Duration) (
284
        *macaroon.Macaroon, error) {
×
285

×
286
        // Wait until macaroon file is created and has valid content before
×
287
        // using it.
×
288
        var mac *macaroon.Macaroon
×
289
        err := wait.NoError(func() error {
×
290
                macBytes, err := os.ReadFile(macPath)
×
291
                if err != nil {
×
292
                        return fmt.Errorf("error reading macaroon file: %w",
×
293
                                err)
×
294
                }
×
295

296
                newMac := &macaroon.Macaroon{}
×
297
                if err = newMac.UnmarshalBinary(macBytes); err != nil {
×
298
                        return fmt.Errorf("error unmarshalling macaroon "+
×
299
                                "file: %w", err)
×
300
                }
×
301
                mac = newMac
×
302

×
303
                return nil
×
304
        }, timeout)
305

306
        return mac, err
×
307
}
308

309
// ConnectRPCWithMacaroon uses the TLS certificate and given macaroon to
310
// create a gRPC client connection.
311
func (hn *HarnessNode) ConnectRPCWithMacaroon(mac *macaroon.Macaroon) (
312
        *grpc.ClientConn, error) {
×
313

×
314
        // Wait until TLS certificate is created and has valid content before
×
315
        // using it, up to 30 sec.
×
316
        var tlsCreds credentials.TransportCredentials
×
317
        err := wait.NoError(func() error {
×
318
                var err error
×
319
                tlsCreds, err = credentials.NewClientTLSFromFile(
×
320
                        hn.Cfg.TLSCertPath, "",
×
321
                )
×
322
                return err
×
323
        }, wait.DefaultTimeout)
×
324
        if err != nil {
×
325
                return nil, fmt.Errorf("error reading TLS cert: %w", err)
×
326
        }
×
327

328
        opts := []grpc.DialOption{
×
329
                grpc.WithBlock(),
×
330
                grpc.WithTransportCredentials(tlsCreds),
×
331
        }
×
332

×
333
        ctx, cancel := context.WithTimeout(hn.runCtx, wait.DefaultTimeout)
×
334
        defer cancel()
×
335

×
336
        if mac == nil {
×
337
                return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...)
×
338
        }
×
339
        macCred, err := macaroons.NewMacaroonCredential(mac)
×
340
        if err != nil {
×
341
                return nil, fmt.Errorf("error cloning mac: %w", err)
×
342
        }
×
343
        opts = append(opts, grpc.WithPerRPCCredentials(macCred))
×
344

×
345
        return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...)
×
346
}
347

348
// ConnectRPC uses the TLS certificate and admin macaroon files written by the
349
// lnd node to create a gRPC client connection.
350
func (hn *HarnessNode) ConnectRPC() (*grpc.ClientConn, error) {
×
351
        // If we should use a macaroon, always take the admin macaroon as a
×
352
        // default.
×
353
        mac, err := hn.ReadMacaroon(hn.Cfg.AdminMacPath, wait.DefaultTimeout)
×
354
        if err != nil {
×
355
                return nil, err
×
356
        }
×
357

358
        return hn.ConnectRPCWithMacaroon(mac)
×
359
}
360

361
// SetExtraArgs assigns the ExtraArgs field for the node's configuration. The
362
// changes will take effect on restart.
363
func (hn *HarnessNode) SetExtraArgs(extraArgs []string) {
×
364
        hn.Cfg.ExtraArgs = extraArgs
×
365
}
×
366

367
// StartLndCmd handles the startup of lnd, creating log files, and possibly
368
// kills the process when needed.
369
func (hn *HarnessNode) StartLndCmd(ctxb context.Context) error {
×
370
        // Init the run context.
×
371
        hn.runCtx, hn.cancel = context.WithCancel(ctxb)
×
372

×
373
        args := hn.Cfg.GenArgs()
×
374
        hn.cmd = exec.Command(hn.Cfg.LndBinary, args...)
×
375

×
376
        // Redirect stderr output to buffer
×
377
        var errb bytes.Buffer
×
378
        hn.cmd.Stderr = &errb
×
379

×
380
        // If the logoutput flag is passed, redirect output from the nodes to
×
381
        // log files.
×
382
        if *logOutput {
×
383
                err := addLogFile(hn)
×
384
                if err != nil {
×
385
                        return err
×
386
                }
×
387
        }
388

389
        // Start the process.
390
        if err := hn.cmd.Start(); err != nil {
×
391
                return err
×
392
        }
×
393

394
        pid := hn.cmd.Process.Pid
×
395
        hn.T.Logf("Starting node (name=%v) with PID=%v", hn.Cfg.Name, pid)
×
396

×
397
        return nil
×
398
}
399

400
// StartWithNoAuth will start the lnd process, creates the grpc connection
401
// without macaroon auth, and waits until the server is reported as waiting to
402
// start.
403
//
404
// NOTE: caller needs to take extra step to create and unlock the wallet.
405
func (hn *HarnessNode) StartWithNoAuth(ctxt context.Context) error {
×
406
        // Start lnd process and prepare logs.
×
407
        if err := hn.StartLndCmd(ctxt); err != nil {
×
408
                return fmt.Errorf("start lnd error: %w", err)
×
409
        }
×
410

411
        // Create an unauthed connection.
412
        conn, err := hn.ConnectRPCWithMacaroon(nil)
×
413
        if err != nil {
×
414
                return fmt.Errorf("ConnectRPCWithMacaroon err: %w", err)
×
415
        }
×
416

417
        // Since the conn is not authed, only the `WalletUnlocker` and `State`
418
        // clients can be inited from this conn.
419
        hn.conn = conn
×
420
        hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, conn, hn.Name())
×
421

×
422
        // Wait till the server is starting.
×
423
        return hn.WaitUntilStarted()
×
424
}
425

426
// Start will start the lnd process, creates the grpc connection, and waits
427
// until the server is fully started.
428
func (hn *HarnessNode) Start(ctxt context.Context) error {
×
429
        // Start lnd process and prepare logs.
×
430
        if err := hn.StartLndCmd(ctxt); err != nil {
×
431
                return fmt.Errorf("start lnd error: %w", err)
×
432
        }
×
433

434
        // Since Stop uses the LightningClient to stop the node, if we fail to
435
        // get a connected client, we have to kill the process.
436
        conn, err := hn.ConnectRPC()
×
437
        if err != nil {
×
438
                err = fmt.Errorf("ConnectRPC err: %w", err)
×
439
                cmdErr := hn.Kill()
×
440
                if cmdErr != nil {
×
441
                        err = fmt.Errorf("kill process got err: %w: %v",
×
442
                                cmdErr, err)
×
443
                }
×
444
                return err
×
445
        }
446

447
        // Init the node by creating the RPC clients, initializing node's
448
        // internal state and watcher.
449
        hn.Initialize(conn)
×
450

×
451
        // Wait till the server is starting.
×
452
        if err := hn.WaitUntilStarted(); err != nil {
×
453
                return fmt.Errorf("waiting for start got: %w", err)
×
454
        }
×
455

456
        // Subscribe for topology updates.
457
        return hn.initLightningClient()
×
458
}
459

460
// InitNode waits until the main gRPC server is detected as active, then
461
// complete the normal HarnessNode gRPC connection creation. A non-nil
462
// `macBytes` indicates the node is initialized stateless, otherwise it will
463
// use the admin macaroon.
464
func (hn *HarnessNode) InitNode(macBytes []byte) error {
×
465
        var (
×
466
                conn *grpc.ClientConn
×
467
                err  error
×
468
        )
×
469

×
470
        // If the node has been initialized stateless, we need to pass the
×
471
        // macaroon to the client.
×
472
        if macBytes != nil {
×
473
                adminMac := &macaroon.Macaroon{}
×
474
                err := adminMac.UnmarshalBinary(macBytes)
×
475
                if err != nil {
×
476
                        return fmt.Errorf("unmarshal failed: %w", err)
×
477
                }
×
478
                conn, err = hn.ConnectRPCWithMacaroon(adminMac)
×
479
                if err != nil {
×
480
                        return err
×
481
                }
×
482
        } else {
×
483
                // Normal initialization, we expect a macaroon to be in the
×
484
                // file system.
×
485
                conn, err = hn.ConnectRPC()
×
486
                if err != nil {
×
487
                        return err
×
488
                }
×
489
        }
490

491
        // Init the node by creating the RPC clients, initializing node's
492
        // internal state and watcher.
493
        hn.Initialize(conn)
×
494

×
495
        // Wait till the server is starting.
×
496
        if err := hn.WaitUntilStarted(); err != nil {
×
497
                return fmt.Errorf("waiting for start got: %w", err)
×
498
        }
×
499

500
        return hn.initLightningClient()
×
501
}
502

503
// InitChangePassword initializes a harness node by passing the change password
504
// request via RPC. After the request is submitted, this method will block until
505
// a macaroon-authenticated RPC connection can be established to the harness
506
// node. Once established, the new connection is used to initialize the
507
// RPC clients and subscribes the HarnessNode to topology changes.
508
func (hn *HarnessNode) ChangePasswordAndInit(
509
        req *lnrpc.ChangePasswordRequest) (
510
        *lnrpc.ChangePasswordResponse, error) {
×
511

×
512
        response := hn.RPC.ChangePassword(req)
×
513
        return response, hn.InitNode(response.AdminMacaroon)
×
514
}
×
515

516
// waitTillServerState makes a subscription to the server's state change and
517
// blocks until the server is in the targeted state.
518
func (hn *HarnessNode) waitTillServerState(
519
        predicate func(state lnrpc.WalletState) bool) error {
×
520

×
521
        client := hn.RPC.SubscribeState()
×
522

×
523
        errChan := make(chan error, 1)
×
524
        done := make(chan struct{})
×
525
        go func() {
×
526
                for {
×
527
                        resp, err := client.Recv()
×
528
                        if err != nil {
×
529
                                errChan <- err
×
530
                                return
×
531
                        }
×
532

533
                        if predicate(resp.State) {
×
534
                                close(done)
×
535
                                return
×
536
                        }
×
537
                }
538
        }()
539

540
        for {
×
541
                select {
×
542
                case <-time.After(wait.NodeStartTimeout):
×
543
                        return fmt.Errorf("timeout waiting for server state")
×
544
                case err := <-errChan:
×
545
                        return fmt.Errorf("receive server state err: %w", err)
×
546

547
                case <-done:
×
548
                        return nil
×
549
                }
550
        }
551
}
552

553
// initLightningClient blocks until the lnd server is fully started and
554
// subscribes the harness node to graph topology updates. This method also
555
// spawns a lightning network watcher for this node, which watches for topology
556
// changes.
557
func (hn *HarnessNode) initLightningClient() error {
×
558
        // Wait until the server is fully started.
×
559
        if err := hn.WaitUntilServerActive(); err != nil {
×
560
                return fmt.Errorf("waiting for server active: %w", err)
×
561
        }
×
562

563
        // Set the harness node's pubkey to what the node claims in GetInfo.
564
        // The RPC must have been started at this point.
565
        if err := hn.attachPubKey(); err != nil {
×
566
                return err
×
567
        }
×
568

569
        // Launch the watcher that will hook into graph related topology change
570
        // from the PoV of this node.
571
        started := make(chan error, 1)
×
572
        go hn.Watcher.topologyWatcher(hn.runCtx, started)
×
573

×
574
        select {
×
575
        // First time reading the channel indicates the topology client is
576
        // started.
577
        case err := <-started:
×
578
                if err != nil {
×
579
                        return fmt.Errorf("create topology client stream "+
×
580
                                "got err: %v", err)
×
581
                }
×
582

583
        case <-time.After(wait.DefaultTimeout):
×
584
                return fmt.Errorf("timeout creating topology client stream")
×
585
        }
586

587
        // Catch topology client stream error inside a goroutine.
588
        go func() {
×
589
                select {
×
590
                case err := <-started:
×
591
                        hn.printErrf("topology client: %v", err)
×
592

593
                case <-hn.runCtx.Done():
×
594
                }
595
        }()
596

597
        return nil
×
598
}
599

600
// attachPubKey queries an unlocked node to retrieve its public key.
601
func (hn *HarnessNode) attachPubKey() error {
×
602
        // Obtain the lnid of this node for quick identification purposes.
×
603
        info := hn.RPC.GetInfo()
×
604
        hn.PubKeyStr = info.IdentityPubkey
×
605

×
606
        pubkey, err := hex.DecodeString(info.IdentityPubkey)
×
607
        if err != nil {
×
608
                return err
×
609
        }
×
610
        copy(hn.PubKey[:], pubkey)
×
611

×
612
        return nil
×
613
}
614

615
// cleanup cleans up all the temporary files created by the node's process.
616
func (hn *HarnessNode) cleanup() error {
×
617
        if hn.Cfg.backupDBDir != "" {
×
618
                err := os.RemoveAll(hn.Cfg.backupDBDir)
×
619
                if err != nil {
×
620
                        return fmt.Errorf("unable to remove backup dir: %w",
×
621
                                err)
×
622
                }
×
623
        }
624

625
        return os.RemoveAll(hn.Cfg.BaseDir)
×
626
}
627

628
// waitForProcessExit Launch a new goroutine which that bubbles up any
629
// potential fatal process errors to the goroutine running the tests.
630
func (hn *HarnessNode) WaitForProcessExit() error {
×
631
        var errReturned error
×
632

×
633
        errChan := make(chan error, 1)
×
634
        go func() {
×
635
                errChan <- hn.cmd.Wait()
×
636
        }()
×
637

638
        select {
×
639
        case err := <-errChan:
×
640
                if err == nil {
×
641
                        break
×
642
                }
643

644
                // If the process has already been canceled, we can exit early
645
                // as the logs have already been saved.
646
                if strings.Contains(err.Error(), "Wait was already called") {
×
647
                        return nil
×
648
                }
×
649

650
                // The process may have already been killed in the test, in
651
                // that case we will skip the error and continue processing
652
                // the logs.
653
                if strings.Contains(err.Error(), "signal: killed") {
×
654
                        break
×
655
                }
656

657
                // Otherwise, we print the error, break the select and save
658
                // logs.
659
                hn.printErrf("wait process exit got err: %v", err)
×
660
                errReturned = err
×
661

662
        case <-time.After(wait.DefaultTimeout):
×
663
                hn.printErrf("timeout waiting for process to exit")
×
664
        }
665

666
        // If the node has an open log file handle, inspect the log file
667
        // to verify that the node shut down correctly.
668
        if hn.logFile != nil {
×
669
                // Make sure log file is closed and renamed if necessary.
×
670
                filename := finalizeLogfile(hn)
×
671

×
672
                // Assert the node has shut down from the log file.
×
673
                err1 := assertNodeShutdown(filename)
×
674
                if err1 != nil {
×
675
                        return fmt.Errorf("[%s]: assert shutdown failed in "+
×
676
                                "log[%s]: %w", hn.Name(), filename, err1)
×
677
                }
×
678
        }
679

680
        // Rename the etcd.log file if the node was running on embedded etcd.
681
        finalizeEtcdLog(hn)
×
682

×
683
        return errReturned
×
684
}
685

686
// Stop attempts to stop the active lnd process.
687
func (hn *HarnessNode) Stop() error {
×
688
        // Do nothing if the process is not running.
×
689
        if hn.runCtx == nil {
×
690
                hn.printErrf("found nil run context")
×
691
                return nil
×
692
        }
×
693

694
        // Stop the runCtx.
695
        hn.cancel()
×
696

×
697
        // If we ever reaches the state where `Watcher` is initialized, it
×
698
        // means the node has an authed connection and all its RPC clients are
×
699
        // ready for use. Thus we will try to stop it via the RPC.
×
700
        if hn.Watcher != nil {
×
701
                // Don't watch for error because sometimes the RPC connection
×
702
                // gets closed before a response is returned.
×
703
                req := lnrpc.StopRequest{}
×
704

×
NEW
705
                // We have to use context.Background(), because both hn.Context
×
NEW
706
                // and hn.runCtx have been canceled by this point. We canceled
×
NEW
707
                // hn.runCtx just few lines above. hn.Context() is canceled by
×
NEW
708
                // Go before calling cleanup callbacks; HarnessNode.Stop() is
×
NEW
709
                // called from shutdownAllNodes which is called from a cleanup.
×
710
                ctxt, cancel := context.WithCancel(context.Background())
×
711
                defer cancel()
×
712

×
713
                err := wait.NoError(func() error {
×
714
                        _, err := hn.RPC.LN.StopDaemon(ctxt, &req)
×
715
                        if err == nil {
×
716
                                return nil
×
717
                        }
×
718

719
                        // If the connection is already closed, we can exit
720
                        // early as the node has already been shut down in the
721
                        // test, e.g., in etcd leader health check test.
722
                        if strings.Contains(err.Error(), "connection refused") {
×
723
                                return nil
×
724
                        }
×
725

726
                        return err
×
727
                }, wait.DefaultTimeout)
728
                if err != nil {
×
729
                        return fmt.Errorf("shutdown timeout: %w", err)
×
730
                }
×
731

732
                // Wait for goroutines to be finished.
733
                done := make(chan struct{})
×
734
                go func() {
×
735
                        hn.Watcher.wg.Wait()
×
736
                        close(done)
×
737
                        hn.Watcher = nil
×
738
                }()
×
739

740
                // If the goroutines fail to finish before timeout, we'll print
741
                // the error to console and continue.
742
                select {
×
743
                case <-time.After(wait.DefaultTimeout):
×
744
                        hn.printErrf("timeout on wait group")
×
745
                case <-done:
×
746
                }
747
        } else {
×
748
                // If the rpc clients are not initiated, we'd kill the process
×
749
                // manually.
×
750
                hn.printErrf("found nil RPC clients")
×
751
                if err := hn.Kill(); err != nil {
×
752
                        // Skip the error if the process is already dead.
×
753
                        if !strings.Contains(
×
754
                                err.Error(), "process already finished",
×
755
                        ) {
×
756

×
757
                                return fmt.Errorf("killing process got: %w",
×
758
                                        err)
×
759
                        }
×
760
                }
761
        }
762

763
        // Close any attempts at further grpc connections.
764
        if hn.conn != nil {
×
765
                if err := hn.CloseConn(); err != nil {
×
766
                        return err
×
767
                }
×
768
        }
769

770
        // Wait for lnd process to exit in the end.
771
        return hn.WaitForProcessExit()
×
772
}
773

774
// CloseConn closes the grpc connection.
775
func (hn *HarnessNode) CloseConn() error {
×
776
        err := status.Code(hn.conn.Close())
×
777
        switch err {
×
778
        case codes.OK:
×
779
                return nil
×
780

781
        // When the context is canceled above, we might get the
782
        // following error as the context is no longer active.
783
        case codes.Canceled:
×
784
                return nil
×
785

786
        case codes.Unknown:
×
787
                return fmt.Errorf("unknown error attempting to stop "+
×
788
                        "grpc client: %v", err)
×
789

790
        default:
×
791
                return fmt.Errorf("error attempting to stop "+
×
792
                        "grpc client: %v", err)
×
793
        }
794
}
795

796
// Shutdown stops the active lnd process and cleans up any temporary
797
// directories created along the way.
798
func (hn *HarnessNode) Shutdown() error {
×
799
        if err := hn.Stop(); err != nil {
×
800
                return err
×
801
        }
×
802

803
        // Exit if we want to skip the cleanup, which happens when a customized
804
        // base dir is used.
805
        if hn.Cfg.SkipCleanup {
×
806
                return nil
×
807
        }
×
808

809
        if err := hn.cleanup(); err != nil {
×
810
                return err
×
811
        }
×
812
        return nil
×
813
}
814

815
// Kill kills the lnd process.
816
func (hn *HarnessNode) Kill() error {
×
817
        return hn.cmd.Process.Kill()
×
818
}
×
819

820
// KillAndWait kills the lnd process and waits for it to finish.
821
func (hn *HarnessNode) KillAndWait() error {
×
822
        err := hn.cmd.Process.Kill()
×
823
        if err != nil {
×
824
                return err
×
825
        }
×
826

827
        _, err = hn.cmd.Process.Wait()
×
828

×
829
        return err
×
830
}
831

832
// printErrf prints an error to the console.
833
func (hn *HarnessNode) printErrf(format string, a ...interface{}) {
×
834
        fmt.Printf("%v: itest error from [%s:%s]: %s\n", //nolint:forbidigo
×
835
                time.Now().UTC(), hn.Cfg.LogFilenamePrefix, hn.Cfg.Name,
×
836
                fmt.Sprintf(format, a...))
×
837
}
×
838

839
// BackupDB creates a backup of the current database.
840
func (hn *HarnessNode) BackupDB() error {
×
841
        if hn.Cfg.backupDBDir != "" {
×
842
                return fmt.Errorf("backup already created")
×
843
        }
×
844

845
        if hn.Cfg.postgresDBName != "" {
×
846
                // Backup database.
×
847
                backupDBName := hn.Cfg.postgresDBName + "_backup"
×
848
                err := executePgQuery(
×
NEW
849
                        hn.Context(), "CREATE DATABASE "+backupDBName+
×
NEW
850
                                " WITH TEMPLATE "+hn.Cfg.postgresDBName,
×
851
                )
×
852
                if err != nil {
×
853
                        return err
×
854
                }
×
855
        } else {
×
856
                // Backup files.
×
857
                tempDir, err := os.MkdirTemp("", "past-state")
×
858
                if err != nil {
×
859
                        return fmt.Errorf("unable to create temp db folder: %w",
×
860
                                err)
×
861
                }
×
862

863
                if err := copyAll(tempDir, hn.Cfg.DBDir()); err != nil {
×
864
                        return fmt.Errorf("unable to copy database files: %w",
×
865
                                err)
×
866
                }
×
867

868
                hn.Cfg.backupDBDir = tempDir
×
869
        }
870

871
        return nil
×
872
}
873

874
// RestoreDB restores a database backup.
875
func (hn *HarnessNode) RestoreDB() error {
×
876
        if hn.Cfg.postgresDBName != "" {
×
877
                // Restore database.
×
878
                backupDBName := hn.Cfg.postgresDBName + "_backup"
×
879
                err := executePgQuery(
×
NEW
880
                        hn.Context(), "DROP DATABASE "+hn.Cfg.postgresDBName,
×
881
                )
×
882
                if err != nil {
×
883
                        return err
×
884
                }
×
885
                err = executePgQuery(
×
NEW
886
                        hn.Context(), "ALTER DATABASE "+backupDBName+
×
NEW
887
                                " RENAME TO "+hn.Cfg.postgresDBName,
×
888
                )
×
889
                if err != nil {
×
890
                        return err
×
891
                }
×
892
        } else {
×
893
                // Restore files.
×
894
                if hn.Cfg.backupDBDir == "" {
×
895
                        return fmt.Errorf("no database backup created")
×
896
                }
×
897

898
                err := copyAll(hn.Cfg.DBDir(), hn.Cfg.backupDBDir)
×
899
                if err != nil {
×
900
                        return fmt.Errorf("unable to copy database files: %w",
×
901
                                err)
×
902
                }
×
903

904
                if err := os.RemoveAll(hn.Cfg.backupDBDir); err != nil {
×
905
                        return fmt.Errorf("unable to remove backup dir: %w",
×
906
                                err)
×
907
                }
×
908
                hn.Cfg.backupDBDir = ""
×
909
        }
910

911
        return nil
×
912
}
913

914
// UpdateGlobalPolicy updates a node's global channel policy.
915
func (hn *HarnessNode) UpdateGlobalPolicy(policy *lnrpc.RoutingPolicy) {
×
916
        updateFeeReq := &lnrpc.PolicyUpdateRequest{
×
917
                BaseFeeMsat: policy.FeeBaseMsat,
×
918
                FeeRate: float64(policy.FeeRateMilliMsat) /
×
919
                        float64(1_000_000),
×
920
                TimeLockDelta: policy.TimeLockDelta,
×
921
                Scope:         &lnrpc.PolicyUpdateRequest_Global{Global: true},
×
922
                MaxHtlcMsat:   policy.MaxHtlcMsat,
×
923
        }
×
924
        hn.RPC.UpdateChannelPolicy(updateFeeReq)
×
925
}
×
926

927
func postgresDatabaseDsn(dbName string) string {
×
928
        return fmt.Sprintf(postgresDsn, dbName)
×
929
}
×
930

931
// createTempPgDB creates a temp postgres database.
NEW
932
func createTempPgDB(ctx context.Context) (string, error) {
×
933
        // Create random database name.
×
934
        randBytes := make([]byte, 8)
×
935
        _, err := rand.Read(randBytes)
×
936
        if err != nil {
×
937
                return "", err
×
938
        }
×
939
        dbName := "itest_" + hex.EncodeToString(randBytes)
×
940

×
941
        // Create database.
×
NEW
942
        err = executePgQuery(ctx, "CREATE DATABASE "+dbName)
×
943
        if err != nil {
×
944
                return "", err
×
945
        }
×
946

947
        return dbName, nil
×
948
}
949

950
// executePgQuery executes a SQL statement in a postgres db.
NEW
951
func executePgQuery(ctx context.Context, query string) error {
×
NEW
952
        pool, err := pgxpool.Connect(ctx, postgresDatabaseDsn("postgres"))
×
953
        if err != nil {
×
954
                return fmt.Errorf("unable to connect to database: %w", err)
×
955
        }
×
956
        defer pool.Close()
×
957

×
NEW
958
        _, err = pool.Exec(ctx, query)
×
959
        return err
×
960
}
961

962
// renameFile is a helper to rename (log) files created during integration
963
// tests.
964
func renameFile(fromFileName, toFileName string) {
×
965
        err := os.Rename(fromFileName, toFileName)
×
966
        if err != nil {
×
967
                fmt.Printf("could not rename %s to %s: %v\n", // nolint:forbidigo
×
968
                        fromFileName, toFileName, err)
×
969
        }
×
970
}
971

972
// getFinalizedLogFilePrefix returns the finalize log filename.
973
func getFinalizedLogFilePrefix(hn *HarnessNode) string {
×
974
        pubKeyHex := hex.EncodeToString(
×
975
                hn.PubKey[:logPubKeyBytes],
×
976
        )
×
977

×
978
        return fmt.Sprintf("%s/%d-%s-%s-%s", GetLogDir(), hn.Cfg.NodeID,
×
979
                hn.Cfg.LogFilenamePrefix, hn.Cfg.Name, pubKeyHex)
×
980
}
×
981

982
// finalizeLogfile makes sure the log file cleanup function is initialized,
983
// even if no log file is created.
984
func finalizeLogfile(hn *HarnessNode) string {
×
985
        // Exit early if there's no log file.
×
986
        if hn.logFile == nil {
×
987
                return ""
×
988
        }
×
989

990
        hn.logFile.Close()
×
991

×
992
        // If logoutput flag is not set, return early.
×
993
        if !*logOutput {
×
994
                return ""
×
995
        }
×
996

997
        newFileName := fmt.Sprintf("%v.log", getFinalizedLogFilePrefix(hn))
×
998
        renameFile(hn.filename, newFileName)
×
999

×
1000
        return newFileName
×
1001
}
1002

1003
// assertNodeShutdown asserts that the node has shut down properly by checking
1004
// the last lines of the log file for the shutdown message "Shutdown complete".
1005
func assertNodeShutdown(filename string) error {
×
1006
        file, err := os.Open(filename)
×
1007
        if err != nil {
×
1008
                return err
×
1009
        }
×
1010
        defer file.Close()
×
1011

×
1012
        // Read more than one line to make sure we get the last line.
×
1013
        // const linesSize = 200
×
1014
        //
×
1015
        // NOTE: Reading 200 bytes of lines should be more than enough to find
×
1016
        // the `Shutdown complete` message. However, this is only true if the
×
1017
        // message is printed the last, which means `lnd` will properly wait
×
1018
        // for all its subsystems to shut down before exiting. Unfortunately
×
1019
        // there is at least one bug in the shutdown process where we don't
×
1020
        // wait for the chain backend to fully quit first, which can be easily
×
1021
        // reproduced by turning on `RPCC=trace` and use a linesSize of 200.
×
1022
        //
×
1023
        // TODO(yy): fix the shutdown process and remove this workaround by
×
1024
        // refactoring the lnd to use only one rpcclient, which requires quite
×
1025
        // some work on the btcwallet front.
×
1026
        const linesSize = 1000
×
1027

×
1028
        buf := make([]byte, linesSize)
×
1029
        stat, statErr := file.Stat()
×
1030
        if statErr != nil {
×
1031
                return err
×
1032
        }
×
1033

1034
        start := stat.Size() - linesSize
×
1035
        _, err = file.ReadAt(buf, start)
×
1036
        if err != nil {
×
1037
                return err
×
1038
        }
×
1039

1040
        // Exit early if the shutdown line is found.
1041
        if bytes.Contains(buf, []byte("Shutdown complete")) {
×
1042
                return nil
×
1043
        }
×
1044

1045
        // For etcd tests, we need to check for the line where the node is
1046
        // blocked at wallet unlock since we are testing how such a behavior is
1047
        // handled by etcd.
1048
        if bytes.Contains(buf, []byte("wallet and unlock")) {
×
1049
                return nil
×
1050
        }
×
1051

1052
        return fmt.Errorf("node did not shut down properly: found log "+
×
1053
                "lines: %s", buf)
×
1054
}
1055

1056
// finalizeEtcdLog saves the etcd log files when test ends.
1057
func finalizeEtcdLog(hn *HarnessNode) {
×
1058
        // Exit early if this is not etcd backend.
×
1059
        if hn.Cfg.DBBackend != BackendEtcd {
×
1060
                return
×
1061
        }
×
1062

1063
        etcdLogFileName := fmt.Sprintf("%s/etcd.log", hn.Cfg.LogDir)
×
1064
        newEtcdLogFileName := fmt.Sprintf("%v-etcd.log",
×
1065
                getFinalizedLogFilePrefix(hn),
×
1066
        )
×
1067

×
1068
        renameFile(etcdLogFileName, newEtcdLogFileName)
×
1069
}
1070

1071
// addLogFile creates log files used by this node.
1072
func addLogFile(hn *HarnessNode) error {
×
1073
        var fileName string
×
1074

×
1075
        dir := GetLogDir()
×
1076
        fileName = fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.Cfg.NodeID,
×
1077
                hn.Cfg.LogFilenamePrefix, hn.Cfg.Name,
×
1078
                hex.EncodeToString(hn.PubKey[:logPubKeyBytes]))
×
1079

×
1080
        // If the node's PubKey is not yet initialized, create a temporary file
×
1081
        // name. Later, after the PubKey has been initialized, the file can be
×
1082
        // moved to its final name with the PubKey included.
×
1083
        if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) {
×
1084
                fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir,
×
1085
                        hn.Cfg.NodeID, hn.Cfg.LogFilenamePrefix,
×
1086
                        hn.Cfg.Name)
×
1087
        }
×
1088

1089
        // Create file if not exists, otherwise append.
1090
        file, err := os.OpenFile(fileName,
×
1091
                os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
×
1092
        if err != nil {
×
1093
                return err
×
1094
        }
×
1095

1096
        // Pass node's stderr to both errb and the file.
1097
        w := io.MultiWriter(hn.cmd.Stderr, file)
×
1098
        hn.cmd.Stderr = w
×
1099

×
1100
        // Pass the node's stdout only to the file.
×
1101
        hn.cmd.Stdout = file
×
1102

×
1103
        // Let the node keep a reference to this file, such that we can add to
×
1104
        // it if necessary.
×
1105
        hn.logFile = file
×
1106

×
1107
        hn.filename = fileName
×
1108

×
1109
        return nil
×
1110
}
1111

1112
// copyAll copies all files and directories from srcDir to dstDir recursively.
1113
// Note that this function does not support links.
1114
func copyAll(dstDir, srcDir string) error {
×
1115
        entries, err := os.ReadDir(srcDir)
×
1116
        if err != nil {
×
1117
                return err
×
1118
        }
×
1119

1120
        for _, entry := range entries {
×
1121
                srcPath := filepath.Join(srcDir, entry.Name())
×
1122
                dstPath := filepath.Join(dstDir, entry.Name())
×
1123

×
1124
                info, err := os.Stat(srcPath)
×
1125
                if err != nil {
×
1126
                        return err
×
1127
                }
×
1128

1129
                if info.IsDir() {
×
1130
                        err := os.Mkdir(dstPath, info.Mode())
×
1131
                        if err != nil && !os.IsExist(err) {
×
1132
                                return err
×
1133
                        }
×
1134

1135
                        err = copyAll(dstPath, srcPath)
×
1136
                        if err != nil {
×
1137
                                return err
×
1138
                        }
×
1139
                } else if err := CopyFile(dstPath, srcPath); err != nil {
×
1140
                        return err
×
1141
                }
×
1142
        }
1143

1144
        return nil
×
1145
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc