• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13587276956

28 Feb 2025 11:32AM UTC coverage: 58.857% (+7.8%) from 51.098%
13587276956

Pull #9567

github

ellemouton
itest: make sure to not hit the natural ChannelUpdate rate limit

Channel Updates have a natural rate limit of 1 update per second due to
the fact that the timestamp carried in the update is only accurate to
the second. So we need to ensure that the next update we send in the
burst is at least 1 second after the last one.
Pull Request #9567: itest: make sure to not hit the natural ChannelUpdate rate limit

136630 of 232137 relevant lines covered (58.86%)

19201.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.3
/peer/test_utils.go
1
package peer
2

3
import (
4
        "bytes"
5
        crand "crypto/rand"
6
        "encoding/binary"
7
        "io"
8
        "math/rand"
9
        "net"
10
        "sync/atomic"
11
        "testing"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/chaincfg/chainhash"
17
        "github.com/btcsuite/btcd/wire"
18
        "github.com/lightningnetwork/lnd/chainntnfs"
19
        "github.com/lightningnetwork/lnd/channeldb"
20
        "github.com/lightningnetwork/lnd/channelnotifier"
21
        "github.com/lightningnetwork/lnd/fn/v2"
22
        graphdb "github.com/lightningnetwork/lnd/graph/db"
23
        "github.com/lightningnetwork/lnd/htlcswitch"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/keychain"
26
        "github.com/lightningnetwork/lnd/kvdb"
27
        "github.com/lightningnetwork/lnd/lntest/channels"
28
        "github.com/lightningnetwork/lnd/lntest/mock"
29
        "github.com/lightningnetwork/lnd/lntypes"
30
        "github.com/lightningnetwork/lnd/lnwallet"
31
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
32
        "github.com/lightningnetwork/lnd/lnwire"
33
        "github.com/lightningnetwork/lnd/netann"
34
        "github.com/lightningnetwork/lnd/pool"
35
        "github.com/lightningnetwork/lnd/queue"
36
        "github.com/lightningnetwork/lnd/shachain"
37
        "github.com/stretchr/testify/require"
38
)
39

40
const (
41
        broadcastHeight = 100
42

43
        // timeout is a timeout value to use for tests which need to wait for
44
        // a return value on a channel.
45
        timeout = time.Second * 5
46

47
        // testCltvRejectDelta is the minimum delta between expiry and current
48
        // height below which htlcs are rejected.
49
        testCltvRejectDelta = 13
50
)
51

52
var (
53
        testKeyLoc = keychain.KeyLocator{Family: keychain.KeyFamilyNodeKey}
54
)
55

56
// noUpdate is a function which can be used as a parameter in
57
// createTestPeerWithChannel to call the setup code with no custom values on
58
// the channels set up.
59
var noUpdate = func(a, b *channeldb.OpenChannel) {}
9✔
60

61
type peerTestCtx struct {
62
        peer          *Brontide
63
        channel       *lnwallet.LightningChannel
64
        notifier      *mock.ChainNotifier
65
        publishTx     <-chan *wire.MsgTx
66
        mockSwitch    *mockMessageSwitch
67
        db            *channeldb.DB
68
        privKey       *btcec.PrivateKey
69
        mockConn      *mockMessageConn
70
        customChan    chan *customMsg
71
        chanStatusMgr *netann.ChanStatusManager
72
}
73

74
// createTestPeerWithChannel creates a channel between two nodes, and returns a
75
// peer for one of the nodes, together with the channel seen from both nodes.
76
// It takes an updateChan function which can be used to modify the default
77
// values on the channel states for each peer.
78
func createTestPeerWithChannel(t *testing.T, updateChan func(a,
79
        b *channeldb.OpenChannel)) (*peerTestCtx, error) {
14✔
80

14✔
81
        params := createTestPeer(t)
14✔
82

14✔
83
        var (
14✔
84
                publishTx     = params.publishTx
14✔
85
                mockSwitch    = params.mockSwitch
14✔
86
                alicePeer     = params.peer
14✔
87
                notifier      = params.notifier
14✔
88
                aliceKeyPriv  = params.privKey
14✔
89
                dbAlice       = params.db
14✔
90
                chanStatusMgr = params.chanStatusMgr
14✔
91
        )
14✔
92

14✔
93
        err := chanStatusMgr.Start()
14✔
94
        require.NoError(t, err)
14✔
95
        t.Cleanup(func() {
28✔
96
                require.NoError(t, chanStatusMgr.Stop())
14✔
97
        })
14✔
98

99
        aliceKeyPub := alicePeer.IdentityKey()
14✔
100
        estimator := alicePeer.cfg.FeeEstimator
14✔
101

14✔
102
        channelCapacity := btcutil.Amount(10 * 1e8)
14✔
103
        channelBal := channelCapacity / 2
14✔
104
        aliceDustLimit := btcutil.Amount(200)
14✔
105
        bobDustLimit := btcutil.Amount(1300)
14✔
106
        csvTimeoutAlice := uint32(5)
14✔
107
        csvTimeoutBob := uint32(4)
14✔
108
        isAliceInitiator := true
14✔
109

14✔
110
        prevOut := &wire.OutPoint{
14✔
111
                Hash:  channels.TestHdSeed,
14✔
112
                Index: 0,
14✔
113
        }
14✔
114
        fundingTxIn := wire.NewTxIn(prevOut, nil, nil)
14✔
115

14✔
116
        bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(
14✔
117
                channels.BobsPrivKey,
14✔
118
        )
14✔
119

14✔
120
        aliceCfg := channeldb.ChannelConfig{
14✔
121
                ChannelStateBounds: channeldb.ChannelStateBounds{
14✔
122
                        MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
14✔
123
                        ChanReserve:      btcutil.Amount(rand.Int63()),
14✔
124
                        MinHTLC:          lnwire.MilliSatoshi(rand.Int63()),
14✔
125
                        MaxAcceptedHtlcs: uint16(rand.Int31()),
14✔
126
                },
14✔
127
                CommitmentParams: channeldb.CommitmentParams{
14✔
128
                        DustLimit: aliceDustLimit,
14✔
129
                        CsvDelay:  uint16(csvTimeoutAlice),
14✔
130
                },
14✔
131
                MultiSigKey: keychain.KeyDescriptor{
14✔
132
                        PubKey: aliceKeyPub,
14✔
133
                },
14✔
134
                RevocationBasePoint: keychain.KeyDescriptor{
14✔
135
                        PubKey: aliceKeyPub,
14✔
136
                },
14✔
137
                PaymentBasePoint: keychain.KeyDescriptor{
14✔
138
                        PubKey: aliceKeyPub,
14✔
139
                },
14✔
140
                DelayBasePoint: keychain.KeyDescriptor{
14✔
141
                        PubKey: aliceKeyPub,
14✔
142
                },
14✔
143
                HtlcBasePoint: keychain.KeyDescriptor{
14✔
144
                        PubKey: aliceKeyPub,
14✔
145
                },
14✔
146
        }
14✔
147
        bobCfg := channeldb.ChannelConfig{
14✔
148
                ChannelStateBounds: channeldb.ChannelStateBounds{
14✔
149
                        MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
14✔
150
                        ChanReserve:      btcutil.Amount(rand.Int63()),
14✔
151
                        MinHTLC:          lnwire.MilliSatoshi(rand.Int63()),
14✔
152
                        MaxAcceptedHtlcs: uint16(rand.Int31()),
14✔
153
                },
14✔
154
                CommitmentParams: channeldb.CommitmentParams{
14✔
155
                        DustLimit: bobDustLimit,
14✔
156
                        CsvDelay:  uint16(csvTimeoutBob),
14✔
157
                },
14✔
158
                MultiSigKey: keychain.KeyDescriptor{
14✔
159
                        PubKey: bobKeyPub,
14✔
160
                },
14✔
161
                RevocationBasePoint: keychain.KeyDescriptor{
14✔
162
                        PubKey: bobKeyPub,
14✔
163
                },
14✔
164
                PaymentBasePoint: keychain.KeyDescriptor{
14✔
165
                        PubKey: bobKeyPub,
14✔
166
                },
14✔
167
                DelayBasePoint: keychain.KeyDescriptor{
14✔
168
                        PubKey: bobKeyPub,
14✔
169
                },
14✔
170
                HtlcBasePoint: keychain.KeyDescriptor{
14✔
171
                        PubKey: bobKeyPub,
14✔
172
                },
14✔
173
        }
14✔
174

14✔
175
        bobRoot, err := chainhash.NewHash(bobKeyPriv.Serialize())
14✔
176
        if err != nil {
14✔
177
                return nil, err
×
178
        }
×
179
        bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot)
14✔
180
        bobFirstRevoke, err := bobPreimageProducer.AtIndex(0)
14✔
181
        if err != nil {
14✔
182
                return nil, err
×
183
        }
×
184
        bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:])
14✔
185

14✔
186
        aliceRoot, err := chainhash.NewHash(aliceKeyPriv.Serialize())
14✔
187
        if err != nil {
14✔
188
                return nil, err
×
189
        }
×
190
        alicePreimageProducer := shachain.NewRevocationProducer(*aliceRoot)
14✔
191
        aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0)
14✔
192
        if err != nil {
14✔
193
                return nil, err
×
194
        }
×
195
        aliceCommitPoint := input.ComputeCommitmentPoint(aliceFirstRevoke[:])
14✔
196

14✔
197
        aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns(
14✔
198
                channelBal, channelBal, &aliceCfg, &bobCfg, aliceCommitPoint,
14✔
199
                bobCommitPoint, *fundingTxIn, channeldb.SingleFunderTweaklessBit,
14✔
200
                isAliceInitiator, 0,
14✔
201
        )
14✔
202
        if err != nil {
14✔
203
                return nil, err
×
204
        }
×
205

206
        dbBob := channeldb.OpenForTesting(t, t.TempDir())
14✔
207

14✔
208
        feePerKw, err := estimator.EstimateFeePerKW(1)
14✔
209
        if err != nil {
14✔
210
                return nil, err
×
211
        }
×
212

213
        // TODO(roasbeef): need to factor in commit fee?
214
        aliceCommit := channeldb.ChannelCommitment{
14✔
215
                CommitHeight:  0,
14✔
216
                LocalBalance:  lnwire.NewMSatFromSatoshis(channelBal),
14✔
217
                RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
14✔
218
                FeePerKw:      btcutil.Amount(feePerKw),
14✔
219
                CommitFee:     feePerKw.FeeForWeight(input.CommitWeight),
14✔
220
                CommitTx:      aliceCommitTx,
14✔
221
                CommitSig:     bytes.Repeat([]byte{1}, 71),
14✔
222
        }
14✔
223
        bobCommit := channeldb.ChannelCommitment{
14✔
224
                CommitHeight:  0,
14✔
225
                LocalBalance:  lnwire.NewMSatFromSatoshis(channelBal),
14✔
226
                RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
14✔
227
                FeePerKw:      btcutil.Amount(feePerKw),
14✔
228
                CommitFee:     feePerKw.FeeForWeight(input.CommitWeight),
14✔
229
                CommitTx:      bobCommitTx,
14✔
230
                CommitSig:     bytes.Repeat([]byte{1}, 71),
14✔
231
        }
14✔
232

14✔
233
        var chanIDBytes [8]byte
14✔
234
        if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil {
14✔
235
                return nil, err
×
236
        }
×
237

238
        shortChanID := lnwire.NewShortChanIDFromInt(
14✔
239
                binary.BigEndian.Uint64(chanIDBytes[:]),
14✔
240
        )
14✔
241

14✔
242
        aliceChannelState := &channeldb.OpenChannel{
14✔
243
                LocalChanCfg:            aliceCfg,
14✔
244
                RemoteChanCfg:           bobCfg,
14✔
245
                IdentityPub:             aliceKeyPub,
14✔
246
                FundingOutpoint:         *prevOut,
14✔
247
                ShortChannelID:          shortChanID,
14✔
248
                ChanType:                channeldb.SingleFunderTweaklessBit,
14✔
249
                IsInitiator:             isAliceInitiator,
14✔
250
                Capacity:                channelCapacity,
14✔
251
                RemoteCurrentRevocation: bobCommitPoint,
14✔
252
                RevocationProducer:      alicePreimageProducer,
14✔
253
                RevocationStore:         shachain.NewRevocationStore(),
14✔
254
                LocalCommitment:         aliceCommit,
14✔
255
                RemoteCommitment:        aliceCommit,
14✔
256
                Db:                      dbAlice.ChannelStateDB(),
14✔
257
                Packager:                channeldb.NewChannelPackager(shortChanID),
14✔
258
                FundingTxn:              channels.TestFundingTx,
14✔
259
        }
14✔
260
        bobChannelState := &channeldb.OpenChannel{
14✔
261
                LocalChanCfg:            bobCfg,
14✔
262
                RemoteChanCfg:           aliceCfg,
14✔
263
                IdentityPub:             bobKeyPub,
14✔
264
                FundingOutpoint:         *prevOut,
14✔
265
                ChanType:                channeldb.SingleFunderTweaklessBit,
14✔
266
                IsInitiator:             !isAliceInitiator,
14✔
267
                Capacity:                channelCapacity,
14✔
268
                RemoteCurrentRevocation: aliceCommitPoint,
14✔
269
                RevocationProducer:      bobPreimageProducer,
14✔
270
                RevocationStore:         shachain.NewRevocationStore(),
14✔
271
                LocalCommitment:         bobCommit,
14✔
272
                RemoteCommitment:        bobCommit,
14✔
273
                Db:                      dbBob.ChannelStateDB(),
14✔
274
                Packager:                channeldb.NewChannelPackager(shortChanID),
14✔
275
        }
14✔
276

14✔
277
        // Set custom values on the channel states.
14✔
278
        updateChan(aliceChannelState, bobChannelState)
14✔
279

14✔
280
        aliceAddr := alicePeer.cfg.Addr.Address
14✔
281
        if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil {
14✔
282
                return nil, err
×
283
        }
×
284

285
        bobAddr := &net.TCPAddr{
14✔
286
                IP:   net.ParseIP("127.0.0.1"),
14✔
287
                Port: 18556,
14✔
288
        }
14✔
289

14✔
290
        if err := bobChannelState.SyncPending(bobAddr, 0); err != nil {
14✔
291
                return nil, err
×
292
        }
×
293

294
        aliceSigner := input.NewMockSigner(
14✔
295
                []*btcec.PrivateKey{aliceKeyPriv}, nil,
14✔
296
        )
14✔
297
        bobSigner := input.NewMockSigner(
14✔
298
                []*btcec.PrivateKey{bobKeyPriv}, nil,
14✔
299
        )
14✔
300

14✔
301
        alicePool := lnwallet.NewSigPool(1, aliceSigner)
14✔
302
        channelAlice, err := lnwallet.NewLightningChannel(
14✔
303
                aliceSigner, aliceChannelState, alicePool,
14✔
304
                lnwallet.WithLeafStore(&lnwallet.MockAuxLeafStore{}),
14✔
305
                lnwallet.WithAuxSigner(lnwallet.NewAuxSignerMock(
14✔
306
                        lnwallet.EmptyMockJobHandler,
14✔
307
                )),
14✔
308
        )
14✔
309
        if err != nil {
14✔
310
                return nil, err
×
311
        }
×
312
        _ = alicePool.Start()
14✔
313
        t.Cleanup(func() {
28✔
314
                require.NoError(t, alicePool.Stop())
14✔
315
        })
14✔
316

317
        bobPool := lnwallet.NewSigPool(1, bobSigner)
14✔
318
        channelBob, err := lnwallet.NewLightningChannel(
14✔
319
                bobSigner, bobChannelState, bobPool,
14✔
320
                lnwallet.WithLeafStore(&lnwallet.MockAuxLeafStore{}),
14✔
321
                lnwallet.WithAuxSigner(lnwallet.NewAuxSignerMock(
14✔
322
                        lnwallet.EmptyMockJobHandler,
14✔
323
                )),
14✔
324
        )
14✔
325
        if err != nil {
14✔
326
                return nil, err
×
327
        }
×
328
        _ = bobPool.Start()
14✔
329
        t.Cleanup(func() {
28✔
330
                require.NoError(t, bobPool.Stop())
14✔
331
        })
14✔
332

333
        alicePeer.remoteFeatures = lnwire.NewFeatureVector(
14✔
334
                nil, lnwire.Features,
14✔
335
        )
14✔
336

14✔
337
        chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
14✔
338
        alicePeer.activeChannels.Store(chanID, channelAlice)
14✔
339

14✔
340
        alicePeer.wg.Add(1)
14✔
341
        go alicePeer.channelManager()
14✔
342

14✔
343
        return &peerTestCtx{
14✔
344
                peer:       alicePeer,
14✔
345
                channel:    channelBob,
14✔
346
                notifier:   notifier,
14✔
347
                publishTx:  publishTx,
14✔
348
                mockSwitch: mockSwitch,
14✔
349
                mockConn:   params.mockConn,
14✔
350
        }, nil
14✔
351
}
352

353
// mockMessageSwitch is a mock implementation of the messageSwitch interface
354
// used for testing without relying on a *htlcswitch.Switch in unit tests.
355
type mockMessageSwitch struct {
356
        links []htlcswitch.ChannelUpdateHandler
357
}
358

359
// BestHeight currently returns a dummy value.
360
func (m *mockMessageSwitch) BestHeight() uint32 {
×
361
        return 0
×
362
}
×
363

364
// CircuitModifier currently returns a dummy value.
365
func (m *mockMessageSwitch) CircuitModifier() htlcswitch.CircuitModifier {
×
366
        return nil
×
367
}
×
368

369
// RemoveLink currently does nothing.
370
func (m *mockMessageSwitch) RemoveLink(cid lnwire.ChannelID) {}
8✔
371

372
// CreateAndAddLink currently returns a dummy value.
373
func (m *mockMessageSwitch) CreateAndAddLink(cfg htlcswitch.ChannelLinkConfig,
374
        lnChan *lnwallet.LightningChannel) error {
×
375

×
376
        return nil
×
377
}
×
378

379
// GetLinksByInterface returns the active links.
380
func (m *mockMessageSwitch) GetLinksByInterface(pub [33]byte) (
381
        []htlcswitch.ChannelUpdateHandler, error) {
19✔
382

19✔
383
        return m.links, nil
19✔
384
}
19✔
385

386
// mockUpdateHandler is a mock implementation of the ChannelUpdateHandler
387
// interface. It is used in mockMessageSwitch's GetLinksByInterface method.
388
type mockUpdateHandler struct {
389
        cid                  lnwire.ChannelID
390
        isOutgoingAddBlocked atomic.Bool
391
        isIncomingAddBlocked atomic.Bool
392
}
393

394
// newMockUpdateHandler creates a new mockUpdateHandler.
395
func newMockUpdateHandler(cid lnwire.ChannelID) *mockUpdateHandler {
9✔
396
        return &mockUpdateHandler{
9✔
397
                cid: cid,
9✔
398
        }
9✔
399
}
9✔
400

401
// HandleChannelUpdate currently does nothing.
402
func (m *mockUpdateHandler) HandleChannelUpdate(msg lnwire.Message) {}
×
403

404
// ChanID returns the mockUpdateHandler's cid.
405
func (m *mockUpdateHandler) ChanID() lnwire.ChannelID { return m.cid }
18✔
406

407
// Bandwidth currently returns a dummy value.
408
func (m *mockUpdateHandler) Bandwidth() lnwire.MilliSatoshi { return 0 }
×
409

410
// EligibleToForward currently returns a dummy value.
411
func (m *mockUpdateHandler) EligibleToForward() bool { return false }
×
412

413
// MayAddOutgoingHtlc currently returns nil.
414
func (m *mockUpdateHandler) MayAddOutgoingHtlc(lnwire.MilliSatoshi) error { return nil }
×
415

416
type mockMessageConn struct {
417
        t *testing.T
418

419
        // MessageConn embeds our interface so that the mock does not need to
420
        // implement every function. The mock will panic if an unspecified function
421
        // is called.
422
        MessageConn
423

424
        // writtenMessages is a channel that our mock pushes written messages into.
425
        writtenMessages chan []byte
426

427
        readMessages   chan []byte
428
        curReadMessage []byte
429

430
        // writeRaceDetectingCounter is incremented on any function call
431
        // associated with writing to the connection. The race detector will
432
        // trigger on this counter if a data race exists.
433
        writeRaceDetectingCounter int
434

435
        // readRaceDetectingCounter is incremented on any function call
436
        // associated with reading from the connection. The race detector will
437
        // trigger on this counter if a data race exists.
438
        readRaceDetectingCounter int
439
}
440

441
func (m *mockUpdateHandler) EnableAdds(dir htlcswitch.LinkDirection) bool {
×
442
        if dir == htlcswitch.Outgoing {
×
443
                return m.isOutgoingAddBlocked.Swap(false)
×
444
        }
×
445

446
        return m.isIncomingAddBlocked.Swap(false)
×
447
}
448

449
func (m *mockUpdateHandler) DisableAdds(dir htlcswitch.LinkDirection) bool {
12✔
450
        if dir == htlcswitch.Outgoing {
20✔
451
                return !m.isOutgoingAddBlocked.Swap(true)
8✔
452
        }
8✔
453

454
        return !m.isIncomingAddBlocked.Swap(true)
4✔
455
}
456

457
func (m *mockUpdateHandler) IsFlushing(dir htlcswitch.LinkDirection) bool {
×
458
        switch dir {
×
459
        case htlcswitch.Outgoing:
×
460
                return m.isOutgoingAddBlocked.Load()
×
461
        case htlcswitch.Incoming:
×
462
                return m.isIncomingAddBlocked.Load()
×
463
        }
464

465
        return false
×
466
}
467

468
func (m *mockUpdateHandler) OnFlushedOnce(hook func()) {
4✔
469
        hook()
4✔
470
}
4✔
471
func (m *mockUpdateHandler) OnCommitOnce(
472
        _ htlcswitch.LinkDirection, hook func(),
473
) {
8✔
474

8✔
475
        hook()
8✔
476
}
8✔
477
func (m *mockUpdateHandler) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
×
478
        // TODO(proofofkeags): Implement
×
479
        c := make(chan fn.Result[lntypes.ChannelParty], 1)
×
480

×
481
        c <- fn.Errf[lntypes.ChannelParty]("InitStfu not yet implemented")
×
482

×
483
        return c
×
484
}
×
485

486
func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn {
20✔
487
        return &mockMessageConn{
20✔
488
                t:               t,
20✔
489
                writtenMessages: make(chan []byte, expectedMessages),
20✔
490
                readMessages:    make(chan []byte, 1),
20✔
491
        }
20✔
492
}
20✔
493

494
// SetWriteDeadline mocks setting write deadline for our conn.
495
func (m *mockMessageConn) SetWriteDeadline(time.Time) error {
13✔
496
        m.writeRaceDetectingCounter++
13✔
497
        return nil
13✔
498
}
13✔
499

500
// Flush mocks a message conn flush.
501
func (m *mockMessageConn) Flush() (int, error) {
13✔
502
        m.writeRaceDetectingCounter++
13✔
503
        return 0, nil
13✔
504
}
13✔
505

506
// WriteMessage mocks sending of a message on our connection. It will push
507
// the bytes sent into the mock's writtenMessages channel.
508
func (m *mockMessageConn) WriteMessage(msg []byte) error {
13✔
509
        m.writeRaceDetectingCounter++
13✔
510

13✔
511
        msgCopy := make([]byte, len(msg))
13✔
512
        copy(msgCopy, msg)
13✔
513

13✔
514
        select {
13✔
515
        case m.writtenMessages <- msgCopy:
13✔
516
        case <-time.After(timeout):
×
517
                m.t.Fatalf("timeout sending message: %v", msgCopy)
×
518
        }
519

520
        return nil
13✔
521
}
522

523
// assertWrite asserts that our mock as had WriteMessage called with the byte
524
// slice we expect.
525
func (m *mockMessageConn) assertWrite(expected []byte) {
4✔
526
        select {
4✔
527
        case actual := <-m.writtenMessages:
4✔
528
                require.Equal(m.t, expected, actual)
4✔
529

530
        case <-time.After(timeout):
×
531
                m.t.Fatalf("timeout waiting for write: %v", expected)
×
532
        }
533
}
534

535
func (m *mockMessageConn) SetReadDeadline(t time.Time) error {
11✔
536
        m.readRaceDetectingCounter++
11✔
537
        return nil
11✔
538
}
11✔
539

540
func (m *mockMessageConn) ReadNextHeader() (uint32, error) {
7✔
541
        m.readRaceDetectingCounter++
7✔
542
        m.curReadMessage = <-m.readMessages
7✔
543
        return uint32(len(m.curReadMessage)), nil
7✔
544
}
7✔
545

546
func (m *mockMessageConn) ReadNextBody(buf []byte) ([]byte, error) {
4✔
547
        m.readRaceDetectingCounter++
4✔
548
        return m.curReadMessage, nil
4✔
549
}
4✔
550

551
func (m *mockMessageConn) RemoteAddr() net.Addr {
22✔
552
        return nil
22✔
553
}
22✔
554

555
func (m *mockMessageConn) LocalAddr() net.Addr {
3✔
556
        return nil
3✔
557
}
3✔
558

559
func (m *mockMessageConn) Close() error {
×
560
        return nil
×
561
}
×
562

563
// createTestPeer creates a new peer for testing and returns a context struct
564
// containing necessary handles and mock objects for conducting tests on peer
565
// functionalities.
566
func createTestPeer(t *testing.T) *peerTestCtx {
19✔
567
        nodeKeyLocator := keychain.KeyLocator{
19✔
568
                Family: keychain.KeyFamilyNodeKey,
19✔
569
        }
19✔
570

19✔
571
        aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
19✔
572
                channels.AlicesPrivKey,
19✔
573
        )
19✔
574

19✔
575
        aliceKeySigner := keychain.NewPrivKeyMessageSigner(
19✔
576
                aliceKeyPriv, nodeKeyLocator,
19✔
577
        )
19✔
578

19✔
579
        aliceAddr := &net.TCPAddr{
19✔
580
                IP:   net.ParseIP("127.0.0.1"),
19✔
581
                Port: 18555,
19✔
582
        }
19✔
583
        cfgAddr := &lnwire.NetAddress{
19✔
584
                IdentityKey: aliceKeyPub,
19✔
585
                Address:     aliceAddr,
19✔
586
                ChainNet:    wire.SimNet,
19✔
587
        }
19✔
588

19✔
589
        errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize)
19✔
590
        require.NoError(t, err)
19✔
591

19✔
592
        chainIO := &mock.ChainIO{
19✔
593
                BestHeight: broadcastHeight,
19✔
594
        }
19✔
595

19✔
596
        publishTx := make(chan *wire.MsgTx)
19✔
597
        wallet := &lnwallet.LightningWallet{
19✔
598
                WalletController: &mock.WalletController{
19✔
599
                        RootKey:               aliceKeyPriv,
19✔
600
                        PublishedTransactions: publishTx,
19✔
601
                },
19✔
602
        }
19✔
603

19✔
604
        const chanActiveTimeout = time.Minute
19✔
605

19✔
606
        dbPath := t.TempDir()
19✔
607

19✔
608
        graphBackend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
19✔
609
                DBPath:            dbPath,
19✔
610
                DBFileName:        "graph.db",
19✔
611
                NoFreelistSync:    true,
19✔
612
                AutoCompact:       false,
19✔
613
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
19✔
614
                DBTimeout:         kvdb.DefaultDBTimeout,
19✔
615
        })
19✔
616
        require.NoError(t, err)
19✔
617

19✔
618
        dbAliceGraph, err := graphdb.NewChannelGraph(&graphdb.Config{
19✔
619
                KVDB: graphBackend,
19✔
620
        })
19✔
621
        require.NoError(t, err)
19✔
622
        require.NoError(t, dbAliceGraph.Start())
19✔
623
        t.Cleanup(func() {
38✔
624
                require.NoError(t, dbAliceGraph.Stop())
19✔
625
        })
19✔
626

627
        dbAliceChannel := channeldb.OpenForTesting(t, dbPath)
19✔
628

19✔
629
        nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
19✔
630

19✔
631
        chanStatusMgr, err := netann.NewChanStatusManager(&netann.
19✔
632
                ChanStatusConfig{
19✔
633
                ChanStatusSampleInterval: 30 * time.Second,
19✔
634
                ChanEnableTimeout:        chanActiveTimeout,
19✔
635
                ChanDisableTimeout:       2 * time.Minute,
19✔
636
                DB:                       dbAliceChannel.ChannelStateDB(),
19✔
637
                Graph:                    dbAliceGraph,
19✔
638
                MessageSigner:            nodeSignerAlice,
19✔
639
                OurPubKey:                aliceKeyPub,
19✔
640
                OurKeyLoc:                testKeyLoc,
19✔
641
                IsChannelActive: func(lnwire.ChannelID) bool {
19✔
642
                        return true
×
643
                },
×
644
                ApplyChannelUpdate: func(*lnwire.ChannelUpdate1,
645
                        *wire.OutPoint, bool) error {
×
646

×
647
                        return nil
×
648
                },
×
649
        })
650
        require.NoError(t, err)
19✔
651

19✔
652
        interceptableSwitchNotifier := &mock.ChainNotifier{
19✔
653
                EpochChan: make(chan *chainntnfs.BlockEpoch, 1),
19✔
654
        }
19✔
655
        interceptableSwitchNotifier.EpochChan <- &chainntnfs.BlockEpoch{
19✔
656
                Height: 1,
19✔
657
        }
19✔
658

19✔
659
        interceptableSwitch, err := htlcswitch.NewInterceptableSwitch(
19✔
660
                &htlcswitch.InterceptableSwitchConfig{
19✔
661
                        CltvRejectDelta:    testCltvRejectDelta,
19✔
662
                        CltvInterceptDelta: testCltvRejectDelta + 3,
19✔
663
                        Notifier:           interceptableSwitchNotifier,
19✔
664
                },
19✔
665
        )
19✔
666
        require.NoError(t, err)
19✔
667

19✔
668
        // TODO(yy): create interface for lnwallet.LightningChannel so we can
19✔
669
        // easily mock it without the following setups.
19✔
670
        notifier := &mock.ChainNotifier{
19✔
671
                SpendChan: make(chan *chainntnfs.SpendDetail),
19✔
672
                EpochChan: make(chan *chainntnfs.BlockEpoch),
19✔
673
                ConfChan:  make(chan *chainntnfs.TxConfirmation),
19✔
674
        }
19✔
675

19✔
676
        mockSwitch := &mockMessageSwitch{}
19✔
677

19✔
678
        // TODO(yy): change ChannelNotifier to be an interface.
19✔
679
        channelNotifier := channelnotifier.New(dbAliceChannel.ChannelStateDB())
19✔
680
        require.NoError(t, channelNotifier.Start())
19✔
681
        t.Cleanup(func() {
38✔
682
                require.NoError(t, channelNotifier.Stop(),
19✔
683
                        "stop channel notifier failed")
19✔
684
        })
19✔
685

686
        writeBufferPool := pool.NewWriteBuffer(
19✔
687
                pool.DefaultWriteBufferGCInterval,
19✔
688
                pool.DefaultWriteBufferExpiryInterval,
19✔
689
        )
19✔
690

19✔
691
        writePool := pool.NewWrite(
19✔
692
                writeBufferPool, 1, timeout,
19✔
693
        )
19✔
694
        require.NoError(t, writePool.Start())
19✔
695

19✔
696
        readBufferPool := pool.NewReadBuffer(
19✔
697
                pool.DefaultReadBufferGCInterval,
19✔
698
                pool.DefaultReadBufferExpiryInterval,
19✔
699
        )
19✔
700

19✔
701
        readPool := pool.NewRead(
19✔
702
                readBufferPool, 1, timeout,
19✔
703
        )
19✔
704
        require.NoError(t, readPool.Start())
19✔
705

19✔
706
        mockConn := newMockConn(t, 1)
19✔
707

19✔
708
        receivedCustomChan := make(chan *customMsg)
19✔
709

19✔
710
        var pubKey [33]byte
19✔
711
        copy(pubKey[:], aliceKeyPub.SerializeCompressed())
19✔
712

19✔
713
        estimator := chainfee.NewStaticEstimator(12500, 0)
19✔
714

19✔
715
        cfg := &Config{
19✔
716
                Addr:              cfgAddr,
19✔
717
                PubKeyBytes:       pubKey,
19✔
718
                ErrorBuffer:       errBuffer,
19✔
719
                ChainIO:           chainIO,
19✔
720
                Switch:            mockSwitch,
19✔
721
                ChanActiveTimeout: chanActiveTimeout,
19✔
722
                InterceptSwitch:   interceptableSwitch,
19✔
723
                ChannelDB:         dbAliceChannel.ChannelStateDB(),
19✔
724
                FeeEstimator:      estimator,
19✔
725
                Wallet:            wallet,
19✔
726
                ChainNotifier:     notifier,
19✔
727
                ChanStatusMgr:     chanStatusMgr,
19✔
728
                Features: lnwire.NewFeatureVector(
19✔
729
                        nil, lnwire.Features,
19✔
730
                ),
19✔
731
                DisconnectPeer: func(b *btcec.PublicKey) error {
19✔
732
                        return nil
×
733
                },
×
734
                ChannelNotifier:               channelNotifier,
735
                PrunePersistentPeerConnection: func([33]byte) {},
1✔
736
                LegacyFeatures:                lnwire.EmptyFeatureVector(),
737
                WritePool:                     writePool,
738
                ReadPool:                      readPool,
739
                Conn:                          mockConn,
740
                HandleCustomMessage: func(
741
                        peer [33]byte, msg *lnwire.Custom) error {
1✔
742

1✔
743
                        receivedCustomChan <- &customMsg{
1✔
744
                                peer: peer,
1✔
745
                                msg:  *msg,
1✔
746
                        }
1✔
747

1✔
748
                        return nil
1✔
749
                },
1✔
750
                PongBuf: make([]byte, lnwire.MaxPongBytes),
751
                FetchLastChanUpdate: func(chanID lnwire.ShortChannelID,
752
                ) (*lnwire.ChannelUpdate1, error) {
2✔
753

2✔
754
                        return &lnwire.ChannelUpdate1{}, nil
2✔
755
                },
2✔
756
        }
757

758
        alicePeer := NewBrontide(*cfg)
19✔
759

19✔
760
        return &peerTestCtx{
19✔
761
                publishTx:     publishTx,
19✔
762
                mockSwitch:    mockSwitch,
19✔
763
                peer:          alicePeer,
19✔
764
                notifier:      notifier,
19✔
765
                db:            dbAliceChannel,
19✔
766
                privKey:       aliceKeyPriv,
19✔
767
                mockConn:      mockConn,
19✔
768
                customChan:    receivedCustomChan,
19✔
769
                chanStatusMgr: chanStatusMgr,
19✔
770
        }
19✔
771
}
772

773
// startPeer invokes the `Start` method on the specified peer and handles any
774
// initial startup messages for testing.
775
func startPeer(t *testing.T, mockConn *mockMessageConn,
776
        peer *Brontide) <-chan struct{} {
3✔
777

3✔
778
        // Start the peer in a goroutine so that we can handle and test for
3✔
779
        // startup messages. Successfully sending and receiving init message,
3✔
780
        // indicates a successful startup.
3✔
781
        done := make(chan struct{})
3✔
782
        go func() {
6✔
783
                require.NoError(t, peer.Start())
3✔
784
                close(done)
3✔
785
        }()
3✔
786

787
        // Receive the init message that should be the first message received on
788
        // startup.
789
        rawMsg, err := fn.RecvOrTimeout[[]byte](
3✔
790
                mockConn.writtenMessages, timeout,
3✔
791
        )
3✔
792
        require.NoError(t, err)
3✔
793

3✔
794
        msgReader := bytes.NewReader(rawMsg)
3✔
795
        nextMsg, err := lnwire.ReadMessage(msgReader, 0)
3✔
796
        require.NoError(t, err)
3✔
797

3✔
798
        _, ok := nextMsg.(*lnwire.Init)
3✔
799
        require.True(t, ok)
3✔
800

3✔
801
        // Write the reply for the init message to complete the startup.
3✔
802
        initReplyMsg := lnwire.NewInitMessage(
3✔
803
                lnwire.NewRawFeatureVector(
3✔
804
                        lnwire.DataLossProtectRequired,
3✔
805
                        lnwire.GossipQueriesOptional,
3✔
806
                ),
3✔
807
                lnwire.NewRawFeatureVector(),
3✔
808
        )
3✔
809

3✔
810
        var b bytes.Buffer
3✔
811
        _, err = lnwire.WriteMessage(&b, initReplyMsg, 0)
3✔
812
        require.NoError(t, err)
3✔
813

3✔
814
        ok = fn.SendOrQuit[[]byte, struct{}](
3✔
815
                mockConn.readMessages, b.Bytes(), make(chan struct{}),
3✔
816
        )
3✔
817
        require.True(t, ok)
3✔
818

3✔
819
        return done
3✔
820
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc