• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13541816950

26 Feb 2025 10:30AM UTC coverage: 58.836% (+0.02%) from 58.815%
13541816950

Pull #9550

github

ellemouton
graph/db: move various cache write calls to ChannelGraph

Here, we move the graph cache writes for AddLightningNode,
DeleteLightningNode, AddChannelEdge and MarkEdgeLive to the
ChannelGraph. Since these are writes, the cache is only updated if the
DB write is successful.
Pull Request #9550: graph: extract cache from CRUD [3]

73 of 85 new or added lines in 1 file covered. (85.88%)

288 existing lines in 17 files now uncovered.

136413 of 231851 relevant lines covered (58.84%)

19233.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.25
/peer/test_utils.go
1
package peer
2

3
import (
4
        "bytes"
5
        crand "crypto/rand"
6
        "encoding/binary"
7
        "io"
8
        "math/rand"
9
        "net"
10
        "sync/atomic"
11
        "testing"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/chaincfg/chainhash"
17
        "github.com/btcsuite/btcd/wire"
18
        "github.com/lightningnetwork/lnd/chainntnfs"
19
        "github.com/lightningnetwork/lnd/channeldb"
20
        "github.com/lightningnetwork/lnd/channelnotifier"
21
        "github.com/lightningnetwork/lnd/fn/v2"
22
        graphdb "github.com/lightningnetwork/lnd/graph/db"
23
        "github.com/lightningnetwork/lnd/htlcswitch"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/keychain"
26
        "github.com/lightningnetwork/lnd/kvdb"
27
        "github.com/lightningnetwork/lnd/lntest/channels"
28
        "github.com/lightningnetwork/lnd/lntest/mock"
29
        "github.com/lightningnetwork/lnd/lntypes"
30
        "github.com/lightningnetwork/lnd/lnwallet"
31
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
32
        "github.com/lightningnetwork/lnd/lnwire"
33
        "github.com/lightningnetwork/lnd/netann"
34
        "github.com/lightningnetwork/lnd/pool"
35
        "github.com/lightningnetwork/lnd/queue"
36
        "github.com/lightningnetwork/lnd/shachain"
37
        "github.com/stretchr/testify/require"
38
)
39

40
const (
41
        broadcastHeight = 100
42

43
        // timeout is a timeout value to use for tests which need to wait for
44
        // a return value on a channel.
45
        timeout = time.Second * 5
46

47
        // testCltvRejectDelta is the minimum delta between expiry and current
48
        // height below which htlcs are rejected.
49
        testCltvRejectDelta = 13
50
)
51

52
var (
53
        testKeyLoc = keychain.KeyLocator{Family: keychain.KeyFamilyNodeKey}
54
)
55

56
// noUpdate is a function which can be used as a parameter in
57
// createTestPeerWithChannel to call the setup code with no custom values on
58
// the channels set up.
59
var noUpdate = func(a, b *channeldb.OpenChannel) {}
9✔
60

61
type peerTestCtx struct {
62
        peer          *Brontide
63
        channel       *lnwallet.LightningChannel
64
        notifier      *mock.ChainNotifier
65
        publishTx     <-chan *wire.MsgTx
66
        mockSwitch    *mockMessageSwitch
67
        db            *channeldb.DB
68
        privKey       *btcec.PrivateKey
69
        mockConn      *mockMessageConn
70
        customChan    chan *customMsg
71
        chanStatusMgr *netann.ChanStatusManager
72
}
73

74
// createTestPeerWithChannel creates a channel between two nodes, and returns a
75
// peer for one of the nodes, together with the channel seen from both nodes.
76
// It takes an updateChan function which can be used to modify the default
77
// values on the channel states for each peer.
78
func createTestPeerWithChannel(t *testing.T, updateChan func(a,
79
        b *channeldb.OpenChannel)) (*peerTestCtx, error) {
14✔
80

14✔
81
        params := createTestPeer(t)
14✔
82

14✔
83
        var (
14✔
84
                publishTx     = params.publishTx
14✔
85
                mockSwitch    = params.mockSwitch
14✔
86
                alicePeer     = params.peer
14✔
87
                notifier      = params.notifier
14✔
88
                aliceKeyPriv  = params.privKey
14✔
89
                dbAlice       = params.db
14✔
90
                chanStatusMgr = params.chanStatusMgr
14✔
91
        )
14✔
92

14✔
93
        err := chanStatusMgr.Start()
14✔
94
        require.NoError(t, err)
14✔
95
        t.Cleanup(func() {
28✔
96
                require.NoError(t, chanStatusMgr.Stop())
14✔
97
        })
14✔
98

99
        aliceKeyPub := alicePeer.IdentityKey()
14✔
100
        estimator := alicePeer.cfg.FeeEstimator
14✔
101

14✔
102
        channelCapacity := btcutil.Amount(10 * 1e8)
14✔
103
        channelBal := channelCapacity / 2
14✔
104
        aliceDustLimit := btcutil.Amount(200)
14✔
105
        bobDustLimit := btcutil.Amount(1300)
14✔
106
        csvTimeoutAlice := uint32(5)
14✔
107
        csvTimeoutBob := uint32(4)
14✔
108
        isAliceInitiator := true
14✔
109

14✔
110
        prevOut := &wire.OutPoint{
14✔
111
                Hash:  channels.TestHdSeed,
14✔
112
                Index: 0,
14✔
113
        }
14✔
114
        fundingTxIn := wire.NewTxIn(prevOut, nil, nil)
14✔
115

14✔
116
        bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(
14✔
117
                channels.BobsPrivKey,
14✔
118
        )
14✔
119

14✔
120
        aliceCfg := channeldb.ChannelConfig{
14✔
121
                ChannelStateBounds: channeldb.ChannelStateBounds{
14✔
122
                        MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
14✔
123
                        ChanReserve:      btcutil.Amount(rand.Int63()),
14✔
124
                        MinHTLC:          lnwire.MilliSatoshi(rand.Int63()),
14✔
125
                        MaxAcceptedHtlcs: uint16(rand.Int31()),
14✔
126
                },
14✔
127
                CommitmentParams: channeldb.CommitmentParams{
14✔
128
                        DustLimit: aliceDustLimit,
14✔
129
                        CsvDelay:  uint16(csvTimeoutAlice),
14✔
130
                },
14✔
131
                MultiSigKey: keychain.KeyDescriptor{
14✔
132
                        PubKey: aliceKeyPub,
14✔
133
                },
14✔
134
                RevocationBasePoint: keychain.KeyDescriptor{
14✔
135
                        PubKey: aliceKeyPub,
14✔
136
                },
14✔
137
                PaymentBasePoint: keychain.KeyDescriptor{
14✔
138
                        PubKey: aliceKeyPub,
14✔
139
                },
14✔
140
                DelayBasePoint: keychain.KeyDescriptor{
14✔
141
                        PubKey: aliceKeyPub,
14✔
142
                },
14✔
143
                HtlcBasePoint: keychain.KeyDescriptor{
14✔
144
                        PubKey: aliceKeyPub,
14✔
145
                },
14✔
146
        }
14✔
147
        bobCfg := channeldb.ChannelConfig{
14✔
148
                ChannelStateBounds: channeldb.ChannelStateBounds{
14✔
149
                        MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
14✔
150
                        ChanReserve:      btcutil.Amount(rand.Int63()),
14✔
151
                        MinHTLC:          lnwire.MilliSatoshi(rand.Int63()),
14✔
152
                        MaxAcceptedHtlcs: uint16(rand.Int31()),
14✔
153
                },
14✔
154
                CommitmentParams: channeldb.CommitmentParams{
14✔
155
                        DustLimit: bobDustLimit,
14✔
156
                        CsvDelay:  uint16(csvTimeoutBob),
14✔
157
                },
14✔
158
                MultiSigKey: keychain.KeyDescriptor{
14✔
159
                        PubKey: bobKeyPub,
14✔
160
                },
14✔
161
                RevocationBasePoint: keychain.KeyDescriptor{
14✔
162
                        PubKey: bobKeyPub,
14✔
163
                },
14✔
164
                PaymentBasePoint: keychain.KeyDescriptor{
14✔
165
                        PubKey: bobKeyPub,
14✔
166
                },
14✔
167
                DelayBasePoint: keychain.KeyDescriptor{
14✔
168
                        PubKey: bobKeyPub,
14✔
169
                },
14✔
170
                HtlcBasePoint: keychain.KeyDescriptor{
14✔
171
                        PubKey: bobKeyPub,
14✔
172
                },
14✔
173
        }
14✔
174

14✔
175
        bobRoot, err := chainhash.NewHash(bobKeyPriv.Serialize())
14✔
176
        if err != nil {
14✔
177
                return nil, err
×
178
        }
×
179
        bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot)
14✔
180
        bobFirstRevoke, err := bobPreimageProducer.AtIndex(0)
14✔
181
        if err != nil {
14✔
182
                return nil, err
×
183
        }
×
184
        bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:])
14✔
185

14✔
186
        aliceRoot, err := chainhash.NewHash(aliceKeyPriv.Serialize())
14✔
187
        if err != nil {
14✔
188
                return nil, err
×
189
        }
×
190
        alicePreimageProducer := shachain.NewRevocationProducer(*aliceRoot)
14✔
191
        aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0)
14✔
192
        if err != nil {
14✔
193
                return nil, err
×
194
        }
×
195
        aliceCommitPoint := input.ComputeCommitmentPoint(aliceFirstRevoke[:])
14✔
196

14✔
197
        aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns(
14✔
198
                channelBal, channelBal, &aliceCfg, &bobCfg, aliceCommitPoint,
14✔
199
                bobCommitPoint, *fundingTxIn, channeldb.SingleFunderTweaklessBit,
14✔
200
                isAliceInitiator, 0,
14✔
201
        )
14✔
202
        if err != nil {
14✔
203
                return nil, err
×
204
        }
×
205

206
        dbBob := channeldb.OpenForTesting(t, t.TempDir())
14✔
207

14✔
208
        feePerKw, err := estimator.EstimateFeePerKW(1)
14✔
209
        if err != nil {
14✔
210
                return nil, err
×
211
        }
×
212

213
        // TODO(roasbeef): need to factor in commit fee?
214
        aliceCommit := channeldb.ChannelCommitment{
14✔
215
                CommitHeight:  0,
14✔
216
                LocalBalance:  lnwire.NewMSatFromSatoshis(channelBal),
14✔
217
                RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
14✔
218
                FeePerKw:      btcutil.Amount(feePerKw),
14✔
219
                CommitFee:     feePerKw.FeeForWeight(input.CommitWeight),
14✔
220
                CommitTx:      aliceCommitTx,
14✔
221
                CommitSig:     bytes.Repeat([]byte{1}, 71),
14✔
222
        }
14✔
223
        bobCommit := channeldb.ChannelCommitment{
14✔
224
                CommitHeight:  0,
14✔
225
                LocalBalance:  lnwire.NewMSatFromSatoshis(channelBal),
14✔
226
                RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
14✔
227
                FeePerKw:      btcutil.Amount(feePerKw),
14✔
228
                CommitFee:     feePerKw.FeeForWeight(input.CommitWeight),
14✔
229
                CommitTx:      bobCommitTx,
14✔
230
                CommitSig:     bytes.Repeat([]byte{1}, 71),
14✔
231
        }
14✔
232

14✔
233
        var chanIDBytes [8]byte
14✔
234
        if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil {
14✔
235
                return nil, err
×
236
        }
×
237

238
        shortChanID := lnwire.NewShortChanIDFromInt(
14✔
239
                binary.BigEndian.Uint64(chanIDBytes[:]),
14✔
240
        )
14✔
241

14✔
242
        aliceChannelState := &channeldb.OpenChannel{
14✔
243
                LocalChanCfg:            aliceCfg,
14✔
244
                RemoteChanCfg:           bobCfg,
14✔
245
                IdentityPub:             aliceKeyPub,
14✔
246
                FundingOutpoint:         *prevOut,
14✔
247
                ShortChannelID:          shortChanID,
14✔
248
                ChanType:                channeldb.SingleFunderTweaklessBit,
14✔
249
                IsInitiator:             isAliceInitiator,
14✔
250
                Capacity:                channelCapacity,
14✔
251
                RemoteCurrentRevocation: bobCommitPoint,
14✔
252
                RevocationProducer:      alicePreimageProducer,
14✔
253
                RevocationStore:         shachain.NewRevocationStore(),
14✔
254
                LocalCommitment:         aliceCommit,
14✔
255
                RemoteCommitment:        aliceCommit,
14✔
256
                Db:                      dbAlice.ChannelStateDB(),
14✔
257
                Packager:                channeldb.NewChannelPackager(shortChanID),
14✔
258
                FundingTxn:              channels.TestFundingTx,
14✔
259
        }
14✔
260
        bobChannelState := &channeldb.OpenChannel{
14✔
261
                LocalChanCfg:            bobCfg,
14✔
262
                RemoteChanCfg:           aliceCfg,
14✔
263
                IdentityPub:             bobKeyPub,
14✔
264
                FundingOutpoint:         *prevOut,
14✔
265
                ChanType:                channeldb.SingleFunderTweaklessBit,
14✔
266
                IsInitiator:             !isAliceInitiator,
14✔
267
                Capacity:                channelCapacity,
14✔
268
                RemoteCurrentRevocation: aliceCommitPoint,
14✔
269
                RevocationProducer:      bobPreimageProducer,
14✔
270
                RevocationStore:         shachain.NewRevocationStore(),
14✔
271
                LocalCommitment:         bobCommit,
14✔
272
                RemoteCommitment:        bobCommit,
14✔
273
                Db:                      dbBob.ChannelStateDB(),
14✔
274
                Packager:                channeldb.NewChannelPackager(shortChanID),
14✔
275
        }
14✔
276

14✔
277
        // Set custom values on the channel states.
14✔
278
        updateChan(aliceChannelState, bobChannelState)
14✔
279

14✔
280
        aliceAddr := alicePeer.cfg.Addr.Address
14✔
281
        if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil {
14✔
282
                return nil, err
×
283
        }
×
284

285
        bobAddr := &net.TCPAddr{
14✔
286
                IP:   net.ParseIP("127.0.0.1"),
14✔
287
                Port: 18556,
14✔
288
        }
14✔
289

14✔
290
        if err := bobChannelState.SyncPending(bobAddr, 0); err != nil {
14✔
291
                return nil, err
×
292
        }
×
293

294
        aliceSigner := input.NewMockSigner(
14✔
295
                []*btcec.PrivateKey{aliceKeyPriv}, nil,
14✔
296
        )
14✔
297
        bobSigner := input.NewMockSigner(
14✔
298
                []*btcec.PrivateKey{bobKeyPriv}, nil,
14✔
299
        )
14✔
300

14✔
301
        alicePool := lnwallet.NewSigPool(1, aliceSigner)
14✔
302
        channelAlice, err := lnwallet.NewLightningChannel(
14✔
303
                aliceSigner, aliceChannelState, alicePool,
14✔
304
                lnwallet.WithLeafStore(&lnwallet.MockAuxLeafStore{}),
14✔
305
                lnwallet.WithAuxSigner(lnwallet.NewAuxSignerMock(
14✔
306
                        lnwallet.EmptyMockJobHandler,
14✔
307
                )),
14✔
308
        )
14✔
309
        if err != nil {
14✔
310
                return nil, err
×
311
        }
×
312
        _ = alicePool.Start()
14✔
313
        t.Cleanup(func() {
28✔
314
                require.NoError(t, alicePool.Stop())
14✔
315
        })
14✔
316

317
        bobPool := lnwallet.NewSigPool(1, bobSigner)
14✔
318
        channelBob, err := lnwallet.NewLightningChannel(
14✔
319
                bobSigner, bobChannelState, bobPool,
14✔
320
                lnwallet.WithLeafStore(&lnwallet.MockAuxLeafStore{}),
14✔
321
                lnwallet.WithAuxSigner(lnwallet.NewAuxSignerMock(
14✔
322
                        lnwallet.EmptyMockJobHandler,
14✔
323
                )),
14✔
324
        )
14✔
325
        if err != nil {
14✔
326
                return nil, err
×
327
        }
×
328
        _ = bobPool.Start()
14✔
329
        t.Cleanup(func() {
28✔
330
                require.NoError(t, bobPool.Stop())
14✔
331
        })
14✔
332

333
        alicePeer.remoteFeatures = lnwire.NewFeatureVector(
14✔
334
                nil, lnwire.Features,
14✔
335
        )
14✔
336

14✔
337
        chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
14✔
338
        alicePeer.activeChannels.Store(chanID, channelAlice)
14✔
339

14✔
340
        alicePeer.wg.Add(1)
14✔
341
        go alicePeer.channelManager()
14✔
342

14✔
343
        return &peerTestCtx{
14✔
344
                peer:       alicePeer,
14✔
345
                channel:    channelBob,
14✔
346
                notifier:   notifier,
14✔
347
                publishTx:  publishTx,
14✔
348
                mockSwitch: mockSwitch,
14✔
349
                mockConn:   params.mockConn,
14✔
350
        }, nil
14✔
351
}
352

353
// mockMessageSwitch is a mock implementation of the messageSwitch interface
354
// used for testing without relying on a *htlcswitch.Switch in unit tests.
355
type mockMessageSwitch struct {
356
        links []htlcswitch.ChannelUpdateHandler
357
}
358

359
// BestHeight currently returns a dummy value.
360
func (m *mockMessageSwitch) BestHeight() uint32 {
×
361
        return 0
×
362
}
×
363

364
// CircuitModifier currently returns a dummy value.
365
func (m *mockMessageSwitch) CircuitModifier() htlcswitch.CircuitModifier {
×
366
        return nil
×
367
}
×
368

369
// RemoveLink currently does nothing.
370
func (m *mockMessageSwitch) RemoveLink(cid lnwire.ChannelID) {}
8✔
371

372
// CreateAndAddLink currently returns a dummy value.
373
func (m *mockMessageSwitch) CreateAndAddLink(cfg htlcswitch.ChannelLinkConfig,
374
        lnChan *lnwallet.LightningChannel) error {
×
375

×
376
        return nil
×
377
}
×
378

379
// GetLinksByInterface returns the active links.
380
func (m *mockMessageSwitch) GetLinksByInterface(pub [33]byte) (
381
        []htlcswitch.ChannelUpdateHandler, error) {
19✔
382

19✔
383
        return m.links, nil
19✔
384
}
19✔
385

386
// mockUpdateHandler is a mock implementation of the ChannelUpdateHandler
387
// interface. It is used in mockMessageSwitch's GetLinksByInterface method.
388
type mockUpdateHandler struct {
389
        cid                  lnwire.ChannelID
390
        isOutgoingAddBlocked atomic.Bool
391
        isIncomingAddBlocked atomic.Bool
392
}
393

394
// newMockUpdateHandler creates a new mockUpdateHandler.
395
func newMockUpdateHandler(cid lnwire.ChannelID) *mockUpdateHandler {
9✔
396
        return &mockUpdateHandler{
9✔
397
                cid: cid,
9✔
398
        }
9✔
399
}
9✔
400

401
// HandleChannelUpdate currently does nothing.
402
func (m *mockUpdateHandler) HandleChannelUpdate(msg lnwire.Message) {}
×
403

404
// ChanID returns the mockUpdateHandler's cid.
405
func (m *mockUpdateHandler) ChanID() lnwire.ChannelID { return m.cid }
18✔
406

407
// Bandwidth currently returns a dummy value.
408
func (m *mockUpdateHandler) Bandwidth() lnwire.MilliSatoshi { return 0 }
×
409

410
// EligibleToForward currently returns a dummy value.
411
func (m *mockUpdateHandler) EligibleToForward() bool { return false }
×
412

413
// MayAddOutgoingHtlc currently returns nil.
414
func (m *mockUpdateHandler) MayAddOutgoingHtlc(lnwire.MilliSatoshi) error { return nil }
×
415

416
type mockMessageConn struct {
417
        t *testing.T
418

419
        // MessageConn embeds our interface so that the mock does not need to
420
        // implement every function. The mock will panic if an unspecified function
421
        // is called.
422
        MessageConn
423

424
        // writtenMessages is a channel that our mock pushes written messages into.
425
        writtenMessages chan []byte
426

427
        readMessages   chan []byte
428
        curReadMessage []byte
429

430
        // writeRaceDetectingCounter is incremented on any function call
431
        // associated with writing to the connection. The race detector will
432
        // trigger on this counter if a data race exists.
433
        writeRaceDetectingCounter int
434

435
        // readRaceDetectingCounter is incremented on any function call
436
        // associated with reading from the connection. The race detector will
437
        // trigger on this counter if a data race exists.
438
        readRaceDetectingCounter int
439
}
440

441
func (m *mockUpdateHandler) EnableAdds(dir htlcswitch.LinkDirection) bool {
×
442
        if dir == htlcswitch.Outgoing {
×
443
                return m.isOutgoingAddBlocked.Swap(false)
×
444
        }
×
445

446
        return m.isIncomingAddBlocked.Swap(false)
×
447
}
448

449
func (m *mockUpdateHandler) DisableAdds(dir htlcswitch.LinkDirection) bool {
12✔
450
        if dir == htlcswitch.Outgoing {
20✔
451
                return !m.isOutgoingAddBlocked.Swap(true)
8✔
452
        }
8✔
453

454
        return !m.isIncomingAddBlocked.Swap(true)
4✔
455
}
456

457
func (m *mockUpdateHandler) IsFlushing(dir htlcswitch.LinkDirection) bool {
×
458
        switch dir {
×
459
        case htlcswitch.Outgoing:
×
460
                return m.isOutgoingAddBlocked.Load()
×
461
        case htlcswitch.Incoming:
×
462
                return m.isIncomingAddBlocked.Load()
×
463
        }
464

465
        return false
×
466
}
467

468
func (m *mockUpdateHandler) OnFlushedOnce(hook func()) {
4✔
469
        hook()
4✔
470
}
4✔
471
func (m *mockUpdateHandler) OnCommitOnce(
472
        _ htlcswitch.LinkDirection, hook func(),
473
) {
8✔
474

8✔
475
        hook()
8✔
476
}
8✔
477
func (m *mockUpdateHandler) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
×
478
        // TODO(proofofkeags): Implement
×
479
        c := make(chan fn.Result[lntypes.ChannelParty], 1)
×
480

×
481
        c <- fn.Errf[lntypes.ChannelParty]("InitStfu not yet implemented")
×
482

×
483
        return c
×
484
}
×
485

486
func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn {
20✔
487
        return &mockMessageConn{
20✔
488
                t:               t,
20✔
489
                writtenMessages: make(chan []byte, expectedMessages),
20✔
490
                readMessages:    make(chan []byte, 1),
20✔
491
        }
20✔
492
}
20✔
493

494
// SetWriteDeadline mocks setting write deadline for our conn.
495
func (m *mockMessageConn) SetWriteDeadline(time.Time) error {
13✔
496
        m.writeRaceDetectingCounter++
13✔
497
        return nil
13✔
498
}
13✔
499

500
// Flush mocks a message conn flush.
501
func (m *mockMessageConn) Flush() (int, error) {
13✔
502
        m.writeRaceDetectingCounter++
13✔
503
        return 0, nil
13✔
504
}
13✔
505

506
// WriteMessage mocks sending of a message on our connection. It will push
507
// the bytes sent into the mock's writtenMessages channel.
508
func (m *mockMessageConn) WriteMessage(msg []byte) error {
13✔
509
        m.writeRaceDetectingCounter++
13✔
510

13✔
511
        msgCopy := make([]byte, len(msg))
13✔
512
        copy(msgCopy, msg)
13✔
513

13✔
514
        select {
13✔
515
        case m.writtenMessages <- msgCopy:
13✔
516
        case <-time.After(timeout):
×
517
                m.t.Fatalf("timeout sending message: %v", msgCopy)
×
518
        }
519

520
        return nil
13✔
521
}
522

523
// assertWrite asserts that our mock as had WriteMessage called with the byte
524
// slice we expect.
525
func (m *mockMessageConn) assertWrite(expected []byte) {
4✔
526
        select {
4✔
527
        case actual := <-m.writtenMessages:
4✔
528
                require.Equal(m.t, expected, actual)
4✔
529

530
        case <-time.After(timeout):
×
531
                m.t.Fatalf("timeout waiting for write: %v", expected)
×
532
        }
533
}
534

535
func (m *mockMessageConn) SetReadDeadline(t time.Time) error {
11✔
536
        m.readRaceDetectingCounter++
11✔
537
        return nil
11✔
538
}
11✔
539

540
func (m *mockMessageConn) ReadNextHeader() (uint32, error) {
7✔
541
        m.readRaceDetectingCounter++
7✔
542
        m.curReadMessage = <-m.readMessages
7✔
543
        return uint32(len(m.curReadMessage)), nil
7✔
544
}
7✔
545

546
func (m *mockMessageConn) ReadNextBody(buf []byte) ([]byte, error) {
4✔
547
        m.readRaceDetectingCounter++
4✔
548
        return m.curReadMessage, nil
4✔
549
}
4✔
550

551
func (m *mockMessageConn) RemoteAddr() net.Addr {
22✔
552
        return nil
22✔
553
}
22✔
554

555
func (m *mockMessageConn) LocalAddr() net.Addr {
3✔
556
        return nil
3✔
557
}
3✔
558

UNCOV
559
func (m *mockMessageConn) Close() error {
×
UNCOV
560
        return nil
×
UNCOV
561
}
×
562

563
// createTestPeer creates a new peer for testing and returns a context struct
564
// containing necessary handles and mock objects for conducting tests on peer
565
// functionalities.
566
func createTestPeer(t *testing.T) *peerTestCtx {
19✔
567
        nodeKeyLocator := keychain.KeyLocator{
19✔
568
                Family: keychain.KeyFamilyNodeKey,
19✔
569
        }
19✔
570

19✔
571
        aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
19✔
572
                channels.AlicesPrivKey,
19✔
573
        )
19✔
574

19✔
575
        aliceKeySigner := keychain.NewPrivKeyMessageSigner(
19✔
576
                aliceKeyPriv, nodeKeyLocator,
19✔
577
        )
19✔
578

19✔
579
        aliceAddr := &net.TCPAddr{
19✔
580
                IP:   net.ParseIP("127.0.0.1"),
19✔
581
                Port: 18555,
19✔
582
        }
19✔
583
        cfgAddr := &lnwire.NetAddress{
19✔
584
                IdentityKey: aliceKeyPub,
19✔
585
                Address:     aliceAddr,
19✔
586
                ChainNet:    wire.SimNet,
19✔
587
        }
19✔
588

19✔
589
        errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize)
19✔
590
        require.NoError(t, err)
19✔
591

19✔
592
        chainIO := &mock.ChainIO{
19✔
593
                BestHeight: broadcastHeight,
19✔
594
        }
19✔
595

19✔
596
        publishTx := make(chan *wire.MsgTx)
19✔
597
        wallet := &lnwallet.LightningWallet{
19✔
598
                WalletController: &mock.WalletController{
19✔
599
                        RootKey:               aliceKeyPriv,
19✔
600
                        PublishedTransactions: publishTx,
19✔
601
                },
19✔
602
        }
19✔
603

19✔
604
        const chanActiveTimeout = time.Minute
19✔
605

19✔
606
        dbPath := t.TempDir()
19✔
607

19✔
608
        graphBackend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
19✔
609
                DBPath:            dbPath,
19✔
610
                DBFileName:        "graph.db",
19✔
611
                NoFreelistSync:    true,
19✔
612
                AutoCompact:       false,
19✔
613
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
19✔
614
                DBTimeout:         kvdb.DefaultDBTimeout,
19✔
615
        })
19✔
616
        require.NoError(t, err)
19✔
617

19✔
618
        dbAliceGraph, err := graphdb.NewChannelGraph(&graphdb.Config{
19✔
619
                KVDB: graphBackend,
19✔
620
        })
19✔
621
        require.NoError(t, err)
19✔
622

19✔
623
        dbAliceChannel := channeldb.OpenForTesting(t, dbPath)
19✔
624

19✔
625
        nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
19✔
626

19✔
627
        chanStatusMgr, err := netann.NewChanStatusManager(&netann.
19✔
628
                ChanStatusConfig{
19✔
629
                ChanStatusSampleInterval: 30 * time.Second,
19✔
630
                ChanEnableTimeout:        chanActiveTimeout,
19✔
631
                ChanDisableTimeout:       2 * time.Minute,
19✔
632
                DB:                       dbAliceChannel.ChannelStateDB(),
19✔
633
                Graph:                    dbAliceGraph,
19✔
634
                MessageSigner:            nodeSignerAlice,
19✔
635
                OurPubKey:                aliceKeyPub,
19✔
636
                OurKeyLoc:                testKeyLoc,
19✔
637
                IsChannelActive: func(lnwire.ChannelID) bool {
19✔
638
                        return true
×
639
                },
×
640
                ApplyChannelUpdate: func(*lnwire.ChannelUpdate1,
641
                        *wire.OutPoint, bool) error {
×
642

×
643
                        return nil
×
644
                },
×
645
        })
646
        require.NoError(t, err)
19✔
647

19✔
648
        interceptableSwitchNotifier := &mock.ChainNotifier{
19✔
649
                EpochChan: make(chan *chainntnfs.BlockEpoch, 1),
19✔
650
        }
19✔
651
        interceptableSwitchNotifier.EpochChan <- &chainntnfs.BlockEpoch{
19✔
652
                Height: 1,
19✔
653
        }
19✔
654

19✔
655
        interceptableSwitch, err := htlcswitch.NewInterceptableSwitch(
19✔
656
                &htlcswitch.InterceptableSwitchConfig{
19✔
657
                        CltvRejectDelta:    testCltvRejectDelta,
19✔
658
                        CltvInterceptDelta: testCltvRejectDelta + 3,
19✔
659
                        Notifier:           interceptableSwitchNotifier,
19✔
660
                },
19✔
661
        )
19✔
662
        require.NoError(t, err)
19✔
663

19✔
664
        // TODO(yy): create interface for lnwallet.LightningChannel so we can
19✔
665
        // easily mock it without the following setups.
19✔
666
        notifier := &mock.ChainNotifier{
19✔
667
                SpendChan: make(chan *chainntnfs.SpendDetail),
19✔
668
                EpochChan: make(chan *chainntnfs.BlockEpoch),
19✔
669
                ConfChan:  make(chan *chainntnfs.TxConfirmation),
19✔
670
        }
19✔
671

19✔
672
        mockSwitch := &mockMessageSwitch{}
19✔
673

19✔
674
        // TODO(yy): change ChannelNotifier to be an interface.
19✔
675
        channelNotifier := channelnotifier.New(dbAliceChannel.ChannelStateDB())
19✔
676
        require.NoError(t, channelNotifier.Start())
19✔
677
        t.Cleanup(func() {
38✔
678
                require.NoError(t, channelNotifier.Stop(),
19✔
679
                        "stop channel notifier failed")
19✔
680
        })
19✔
681

682
        writeBufferPool := pool.NewWriteBuffer(
19✔
683
                pool.DefaultWriteBufferGCInterval,
19✔
684
                pool.DefaultWriteBufferExpiryInterval,
19✔
685
        )
19✔
686

19✔
687
        writePool := pool.NewWrite(
19✔
688
                writeBufferPool, 1, timeout,
19✔
689
        )
19✔
690
        require.NoError(t, writePool.Start())
19✔
691

19✔
692
        readBufferPool := pool.NewReadBuffer(
19✔
693
                pool.DefaultReadBufferGCInterval,
19✔
694
                pool.DefaultReadBufferExpiryInterval,
19✔
695
        )
19✔
696

19✔
697
        readPool := pool.NewRead(
19✔
698
                readBufferPool, 1, timeout,
19✔
699
        )
19✔
700
        require.NoError(t, readPool.Start())
19✔
701

19✔
702
        mockConn := newMockConn(t, 1)
19✔
703

19✔
704
        receivedCustomChan := make(chan *customMsg)
19✔
705

19✔
706
        var pubKey [33]byte
19✔
707
        copy(pubKey[:], aliceKeyPub.SerializeCompressed())
19✔
708

19✔
709
        estimator := chainfee.NewStaticEstimator(12500, 0)
19✔
710

19✔
711
        cfg := &Config{
19✔
712
                Addr:              cfgAddr,
19✔
713
                PubKeyBytes:       pubKey,
19✔
714
                ErrorBuffer:       errBuffer,
19✔
715
                ChainIO:           chainIO,
19✔
716
                Switch:            mockSwitch,
19✔
717
                ChanActiveTimeout: chanActiveTimeout,
19✔
718
                InterceptSwitch:   interceptableSwitch,
19✔
719
                ChannelDB:         dbAliceChannel.ChannelStateDB(),
19✔
720
                FeeEstimator:      estimator,
19✔
721
                Wallet:            wallet,
19✔
722
                ChainNotifier:     notifier,
19✔
723
                ChanStatusMgr:     chanStatusMgr,
19✔
724
                Features: lnwire.NewFeatureVector(
19✔
725
                        nil, lnwire.Features,
19✔
726
                ),
19✔
727
                DisconnectPeer: func(b *btcec.PublicKey) error {
19✔
728
                        return nil
×
729
                },
×
730
                ChannelNotifier:               channelNotifier,
731
                PrunePersistentPeerConnection: func([33]byte) {},
1✔
732
                LegacyFeatures:                lnwire.EmptyFeatureVector(),
733
                WritePool:                     writePool,
734
                ReadPool:                      readPool,
735
                Conn:                          mockConn,
736
                HandleCustomMessage: func(
737
                        peer [33]byte, msg *lnwire.Custom) error {
1✔
738

1✔
739
                        receivedCustomChan <- &customMsg{
1✔
740
                                peer: peer,
1✔
741
                                msg:  *msg,
1✔
742
                        }
1✔
743

1✔
744
                        return nil
1✔
745
                },
1✔
746
                PongBuf: make([]byte, lnwire.MaxPongBytes),
747
                FetchLastChanUpdate: func(chanID lnwire.ShortChannelID,
748
                ) (*lnwire.ChannelUpdate1, error) {
2✔
749

2✔
750
                        return &lnwire.ChannelUpdate1{}, nil
2✔
751
                },
2✔
752
        }
753

754
        alicePeer := NewBrontide(*cfg)
19✔
755

19✔
756
        return &peerTestCtx{
19✔
757
                publishTx:     publishTx,
19✔
758
                mockSwitch:    mockSwitch,
19✔
759
                peer:          alicePeer,
19✔
760
                notifier:      notifier,
19✔
761
                db:            dbAliceChannel,
19✔
762
                privKey:       aliceKeyPriv,
19✔
763
                mockConn:      mockConn,
19✔
764
                customChan:    receivedCustomChan,
19✔
765
                chanStatusMgr: chanStatusMgr,
19✔
766
        }
19✔
767
}
768

769
// startPeer invokes the `Start` method on the specified peer and handles any
770
// initial startup messages for testing.
771
func startPeer(t *testing.T, mockConn *mockMessageConn,
772
        peer *Brontide) <-chan struct{} {
3✔
773

3✔
774
        // Start the peer in a goroutine so that we can handle and test for
3✔
775
        // startup messages. Successfully sending and receiving init message,
3✔
776
        // indicates a successful startup.
3✔
777
        done := make(chan struct{})
3✔
778
        go func() {
6✔
779
                require.NoError(t, peer.Start())
3✔
780
                close(done)
3✔
781
        }()
3✔
782

783
        // Receive the init message that should be the first message received on
784
        // startup.
785
        rawMsg, err := fn.RecvOrTimeout[[]byte](
3✔
786
                mockConn.writtenMessages, timeout,
3✔
787
        )
3✔
788
        require.NoError(t, err)
3✔
789

3✔
790
        msgReader := bytes.NewReader(rawMsg)
3✔
791
        nextMsg, err := lnwire.ReadMessage(msgReader, 0)
3✔
792
        require.NoError(t, err)
3✔
793

3✔
794
        _, ok := nextMsg.(*lnwire.Init)
3✔
795
        require.True(t, ok)
3✔
796

3✔
797
        // Write the reply for the init message to complete the startup.
3✔
798
        initReplyMsg := lnwire.NewInitMessage(
3✔
799
                lnwire.NewRawFeatureVector(
3✔
800
                        lnwire.DataLossProtectRequired,
3✔
801
                        lnwire.GossipQueriesOptional,
3✔
802
                ),
3✔
803
                lnwire.NewRawFeatureVector(),
3✔
804
        )
3✔
805

3✔
806
        var b bytes.Buffer
3✔
807
        _, err = lnwire.WriteMessage(&b, initReplyMsg, 0)
3✔
808
        require.NoError(t, err)
3✔
809

3✔
810
        ok = fn.SendOrQuit[[]byte, struct{}](
3✔
811
                mockConn.readMessages, b.Bytes(), make(chan struct{}),
3✔
812
        )
3✔
813
        require.True(t, ok)
3✔
814

3✔
815
        return done
3✔
816
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc