• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11909028775

19 Nov 2024 08:27AM UTC coverage: 58.987% (+0.04%) from 58.947%
11909028775

Pull #9258

github

yyforyongyu
chainntnfs: skip duplicate `numConfsLeft` notifications

This commit adds a new state to the `ConfNtfn` struct to start tracking
the number of confs left to be notified to avoid sending duplicate
notifications.
Pull Request #9258: chainntnfs: fix missing notifications

59 of 72 new or added lines in 5 files covered. (81.94%)

176 existing lines in 31 files now uncovered.

132559 of 224725 relevant lines covered (58.99%)

19680.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.93
/peer/test_utils.go
1
package peer
2

3
import (
4
        "bytes"
5
        crand "crypto/rand"
6
        "encoding/binary"
7
        "io"
8
        "math/rand"
9
        "net"
10
        "sync/atomic"
11
        "testing"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/chaincfg/chainhash"
17
        "github.com/btcsuite/btcd/wire"
18
        "github.com/lightningnetwork/lnd/chainntnfs"
19
        "github.com/lightningnetwork/lnd/channeldb"
20
        "github.com/lightningnetwork/lnd/channelnotifier"
21
        "github.com/lightningnetwork/lnd/fn"
22
        "github.com/lightningnetwork/lnd/htlcswitch"
23
        "github.com/lightningnetwork/lnd/input"
24
        "github.com/lightningnetwork/lnd/keychain"
25
        "github.com/lightningnetwork/lnd/lntest/channels"
26
        "github.com/lightningnetwork/lnd/lntest/mock"
27
        "github.com/lightningnetwork/lnd/lnwallet"
28
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
29
        "github.com/lightningnetwork/lnd/lnwire"
30
        "github.com/lightningnetwork/lnd/netann"
31
        "github.com/lightningnetwork/lnd/pool"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/shachain"
34
        "github.com/stretchr/testify/require"
35
)
36

37
const (
38
        broadcastHeight = 100
39

40
        // timeout is a timeout value to use for tests which need to wait for
41
        // a return value on a channel.
42
        timeout = time.Second * 5
43

44
        // testCltvRejectDelta is the minimum delta between expiry and current
45
        // height below which htlcs are rejected.
46
        testCltvRejectDelta = 13
47
)
48

49
var (
50
        testKeyLoc = keychain.KeyLocator{Family: keychain.KeyFamilyNodeKey}
51
)
52

53
// noUpdate is a function which can be used as a parameter in
54
// createTestPeerWithChannel to call the setup code with no custom values on
55
// the channels set up.
56
var noUpdate = func(a, b *channeldb.OpenChannel) {}
9✔
57

58
type peerTestCtx struct {
59
        peer          *Brontide
60
        channel       *lnwallet.LightningChannel
61
        notifier      *mock.ChainNotifier
62
        publishTx     <-chan *wire.MsgTx
63
        mockSwitch    *mockMessageSwitch
64
        db            *channeldb.DB
65
        privKey       *btcec.PrivateKey
66
        mockConn      *mockMessageConn
67
        customChan    chan *customMsg
68
        chanStatusMgr *netann.ChanStatusManager
69
}
70

71
// createTestPeerWithChannel creates a channel between two nodes, and returns a
72
// peer for one of the nodes, together with the channel seen from both nodes.
73
// It takes an updateChan function which can be used to modify the default
74
// values on the channel states for each peer.
75
func createTestPeerWithChannel(t *testing.T, updateChan func(a,
76
        b *channeldb.OpenChannel)) (*peerTestCtx, error) {
14✔
77

14✔
78
        params := createTestPeer(t)
14✔
79

14✔
80
        var (
14✔
81
                publishTx     = params.publishTx
14✔
82
                mockSwitch    = params.mockSwitch
14✔
83
                alicePeer     = params.peer
14✔
84
                notifier      = params.notifier
14✔
85
                aliceKeyPriv  = params.privKey
14✔
86
                dbAlice       = params.db
14✔
87
                chanStatusMgr = params.chanStatusMgr
14✔
88
        )
14✔
89

14✔
90
        err := chanStatusMgr.Start()
14✔
91
        require.NoError(t, err)
14✔
92
        t.Cleanup(func() {
28✔
93
                require.NoError(t, chanStatusMgr.Stop())
14✔
94
        })
14✔
95

96
        aliceKeyPub := alicePeer.IdentityKey()
14✔
97
        estimator := alicePeer.cfg.FeeEstimator
14✔
98

14✔
99
        channelCapacity := btcutil.Amount(10 * 1e8)
14✔
100
        channelBal := channelCapacity / 2
14✔
101
        aliceDustLimit := btcutil.Amount(200)
14✔
102
        bobDustLimit := btcutil.Amount(1300)
14✔
103
        csvTimeoutAlice := uint32(5)
14✔
104
        csvTimeoutBob := uint32(4)
14✔
105
        isAliceInitiator := true
14✔
106

14✔
107
        prevOut := &wire.OutPoint{
14✔
108
                Hash:  channels.TestHdSeed,
14✔
109
                Index: 0,
14✔
110
        }
14✔
111
        fundingTxIn := wire.NewTxIn(prevOut, nil, nil)
14✔
112

14✔
113
        bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(
14✔
114
                channels.BobsPrivKey,
14✔
115
        )
14✔
116

14✔
117
        aliceCfg := channeldb.ChannelConfig{
14✔
118
                ChannelStateBounds: channeldb.ChannelStateBounds{
14✔
119
                        MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
14✔
120
                        ChanReserve:      btcutil.Amount(rand.Int63()),
14✔
121
                        MinHTLC:          lnwire.MilliSatoshi(rand.Int63()),
14✔
122
                        MaxAcceptedHtlcs: uint16(rand.Int31()),
14✔
123
                },
14✔
124
                CommitmentParams: channeldb.CommitmentParams{
14✔
125
                        DustLimit: aliceDustLimit,
14✔
126
                        CsvDelay:  uint16(csvTimeoutAlice),
14✔
127
                },
14✔
128
                MultiSigKey: keychain.KeyDescriptor{
14✔
129
                        PubKey: aliceKeyPub,
14✔
130
                },
14✔
131
                RevocationBasePoint: keychain.KeyDescriptor{
14✔
132
                        PubKey: aliceKeyPub,
14✔
133
                },
14✔
134
                PaymentBasePoint: keychain.KeyDescriptor{
14✔
135
                        PubKey: aliceKeyPub,
14✔
136
                },
14✔
137
                DelayBasePoint: keychain.KeyDescriptor{
14✔
138
                        PubKey: aliceKeyPub,
14✔
139
                },
14✔
140
                HtlcBasePoint: keychain.KeyDescriptor{
14✔
141
                        PubKey: aliceKeyPub,
14✔
142
                },
14✔
143
        }
14✔
144
        bobCfg := channeldb.ChannelConfig{
14✔
145
                ChannelStateBounds: channeldb.ChannelStateBounds{
14✔
146
                        MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
14✔
147
                        ChanReserve:      btcutil.Amount(rand.Int63()),
14✔
148
                        MinHTLC:          lnwire.MilliSatoshi(rand.Int63()),
14✔
149
                        MaxAcceptedHtlcs: uint16(rand.Int31()),
14✔
150
                },
14✔
151
                CommitmentParams: channeldb.CommitmentParams{
14✔
152
                        DustLimit: bobDustLimit,
14✔
153
                        CsvDelay:  uint16(csvTimeoutBob),
14✔
154
                },
14✔
155
                MultiSigKey: keychain.KeyDescriptor{
14✔
156
                        PubKey: bobKeyPub,
14✔
157
                },
14✔
158
                RevocationBasePoint: keychain.KeyDescriptor{
14✔
159
                        PubKey: bobKeyPub,
14✔
160
                },
14✔
161
                PaymentBasePoint: keychain.KeyDescriptor{
14✔
162
                        PubKey: bobKeyPub,
14✔
163
                },
14✔
164
                DelayBasePoint: keychain.KeyDescriptor{
14✔
165
                        PubKey: bobKeyPub,
14✔
166
                },
14✔
167
                HtlcBasePoint: keychain.KeyDescriptor{
14✔
168
                        PubKey: bobKeyPub,
14✔
169
                },
14✔
170
        }
14✔
171

14✔
172
        bobRoot, err := chainhash.NewHash(bobKeyPriv.Serialize())
14✔
173
        if err != nil {
14✔
174
                return nil, err
×
175
        }
×
176
        bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot)
14✔
177
        bobFirstRevoke, err := bobPreimageProducer.AtIndex(0)
14✔
178
        if err != nil {
14✔
179
                return nil, err
×
180
        }
×
181
        bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:])
14✔
182

14✔
183
        aliceRoot, err := chainhash.NewHash(aliceKeyPriv.Serialize())
14✔
184
        if err != nil {
14✔
185
                return nil, err
×
186
        }
×
187
        alicePreimageProducer := shachain.NewRevocationProducer(*aliceRoot)
14✔
188
        aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0)
14✔
189
        if err != nil {
14✔
190
                return nil, err
×
191
        }
×
192
        aliceCommitPoint := input.ComputeCommitmentPoint(aliceFirstRevoke[:])
14✔
193

14✔
194
        aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns(
14✔
195
                channelBal, channelBal, &aliceCfg, &bobCfg, aliceCommitPoint,
14✔
196
                bobCommitPoint, *fundingTxIn, channeldb.SingleFunderTweaklessBit,
14✔
197
                isAliceInitiator, 0,
14✔
198
        )
14✔
199
        if err != nil {
14✔
200
                return nil, err
×
201
        }
×
202

203
        dbBob, err := channeldb.Open(t.TempDir())
14✔
204
        if err != nil {
14✔
205
                return nil, err
×
206
        }
×
207
        t.Cleanup(func() {
28✔
208
                require.NoError(t, dbBob.Close())
14✔
209
        })
14✔
210

211
        feePerKw, err := estimator.EstimateFeePerKW(1)
14✔
212
        if err != nil {
14✔
213
                return nil, err
×
214
        }
×
215

216
        // TODO(roasbeef): need to factor in commit fee?
217
        aliceCommit := channeldb.ChannelCommitment{
14✔
218
                CommitHeight:  0,
14✔
219
                LocalBalance:  lnwire.NewMSatFromSatoshis(channelBal),
14✔
220
                RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
14✔
221
                FeePerKw:      btcutil.Amount(feePerKw),
14✔
222
                CommitFee:     feePerKw.FeeForWeight(input.CommitWeight),
14✔
223
                CommitTx:      aliceCommitTx,
14✔
224
                CommitSig:     bytes.Repeat([]byte{1}, 71),
14✔
225
        }
14✔
226
        bobCommit := channeldb.ChannelCommitment{
14✔
227
                CommitHeight:  0,
14✔
228
                LocalBalance:  lnwire.NewMSatFromSatoshis(channelBal),
14✔
229
                RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
14✔
230
                FeePerKw:      btcutil.Amount(feePerKw),
14✔
231
                CommitFee:     feePerKw.FeeForWeight(input.CommitWeight),
14✔
232
                CommitTx:      bobCommitTx,
14✔
233
                CommitSig:     bytes.Repeat([]byte{1}, 71),
14✔
234
        }
14✔
235

14✔
236
        var chanIDBytes [8]byte
14✔
237
        if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil {
14✔
238
                return nil, err
×
239
        }
×
240

241
        shortChanID := lnwire.NewShortChanIDFromInt(
14✔
242
                binary.BigEndian.Uint64(chanIDBytes[:]),
14✔
243
        )
14✔
244

14✔
245
        aliceChannelState := &channeldb.OpenChannel{
14✔
246
                LocalChanCfg:            aliceCfg,
14✔
247
                RemoteChanCfg:           bobCfg,
14✔
248
                IdentityPub:             aliceKeyPub,
14✔
249
                FundingOutpoint:         *prevOut,
14✔
250
                ShortChannelID:          shortChanID,
14✔
251
                ChanType:                channeldb.SingleFunderTweaklessBit,
14✔
252
                IsInitiator:             isAliceInitiator,
14✔
253
                Capacity:                channelCapacity,
14✔
254
                RemoteCurrentRevocation: bobCommitPoint,
14✔
255
                RevocationProducer:      alicePreimageProducer,
14✔
256
                RevocationStore:         shachain.NewRevocationStore(),
14✔
257
                LocalCommitment:         aliceCommit,
14✔
258
                RemoteCommitment:        aliceCommit,
14✔
259
                Db:                      dbAlice.ChannelStateDB(),
14✔
260
                Packager:                channeldb.NewChannelPackager(shortChanID),
14✔
261
                FundingTxn:              channels.TestFundingTx,
14✔
262
        }
14✔
263
        bobChannelState := &channeldb.OpenChannel{
14✔
264
                LocalChanCfg:            bobCfg,
14✔
265
                RemoteChanCfg:           aliceCfg,
14✔
266
                IdentityPub:             bobKeyPub,
14✔
267
                FundingOutpoint:         *prevOut,
14✔
268
                ChanType:                channeldb.SingleFunderTweaklessBit,
14✔
269
                IsInitiator:             !isAliceInitiator,
14✔
270
                Capacity:                channelCapacity,
14✔
271
                RemoteCurrentRevocation: aliceCommitPoint,
14✔
272
                RevocationProducer:      bobPreimageProducer,
14✔
273
                RevocationStore:         shachain.NewRevocationStore(),
14✔
274
                LocalCommitment:         bobCommit,
14✔
275
                RemoteCommitment:        bobCommit,
14✔
276
                Db:                      dbBob.ChannelStateDB(),
14✔
277
                Packager:                channeldb.NewChannelPackager(shortChanID),
14✔
278
        }
14✔
279

14✔
280
        // Set custom values on the channel states.
14✔
281
        updateChan(aliceChannelState, bobChannelState)
14✔
282

14✔
283
        aliceAddr := alicePeer.cfg.Addr.Address
14✔
284
        if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil {
14✔
285
                return nil, err
×
286
        }
×
287

288
        bobAddr := &net.TCPAddr{
14✔
289
                IP:   net.ParseIP("127.0.0.1"),
14✔
290
                Port: 18556,
14✔
291
        }
14✔
292

14✔
293
        if err := bobChannelState.SyncPending(bobAddr, 0); err != nil {
14✔
294
                return nil, err
×
295
        }
×
296

297
        aliceSigner := input.NewMockSigner(
14✔
298
                []*btcec.PrivateKey{aliceKeyPriv}, nil,
14✔
299
        )
14✔
300
        bobSigner := input.NewMockSigner(
14✔
301
                []*btcec.PrivateKey{bobKeyPriv}, nil,
14✔
302
        )
14✔
303

14✔
304
        alicePool := lnwallet.NewSigPool(1, aliceSigner)
14✔
305
        channelAlice, err := lnwallet.NewLightningChannel(
14✔
306
                aliceSigner, aliceChannelState, alicePool,
14✔
307
                lnwallet.WithLeafStore(&lnwallet.MockAuxLeafStore{}),
14✔
308
                lnwallet.WithAuxSigner(lnwallet.NewAuxSignerMock(
14✔
309
                        lnwallet.EmptyMockJobHandler,
14✔
310
                )),
14✔
311
        )
14✔
312
        if err != nil {
14✔
313
                return nil, err
×
314
        }
×
315
        _ = alicePool.Start()
14✔
316
        t.Cleanup(func() {
28✔
317
                require.NoError(t, alicePool.Stop())
14✔
318
        })
14✔
319

320
        bobPool := lnwallet.NewSigPool(1, bobSigner)
14✔
321
        channelBob, err := lnwallet.NewLightningChannel(
14✔
322
                bobSigner, bobChannelState, bobPool,
14✔
323
                lnwallet.WithLeafStore(&lnwallet.MockAuxLeafStore{}),
14✔
324
                lnwallet.WithAuxSigner(lnwallet.NewAuxSignerMock(
14✔
325
                        lnwallet.EmptyMockJobHandler,
14✔
326
                )),
14✔
327
        )
14✔
328
        if err != nil {
14✔
329
                return nil, err
×
330
        }
×
331
        _ = bobPool.Start()
14✔
332
        t.Cleanup(func() {
28✔
333
                require.NoError(t, bobPool.Stop())
14✔
334
        })
14✔
335

336
        alicePeer.remoteFeatures = lnwire.NewFeatureVector(
14✔
337
                nil, lnwire.Features,
14✔
338
        )
14✔
339

14✔
340
        chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
14✔
341
        alicePeer.activeChannels.Store(chanID, channelAlice)
14✔
342

14✔
343
        alicePeer.wg.Add(1)
14✔
344
        go alicePeer.channelManager()
14✔
345

14✔
346
        return &peerTestCtx{
14✔
347
                peer:       alicePeer,
14✔
348
                channel:    channelBob,
14✔
349
                notifier:   notifier,
14✔
350
                publishTx:  publishTx,
14✔
351
                mockSwitch: mockSwitch,
14✔
352
                mockConn:   params.mockConn,
14✔
353
        }, nil
14✔
354
}
355

356
// mockMessageSwitch is a mock implementation of the messageSwitch interface
357
// used for testing without relying on a *htlcswitch.Switch in unit tests.
358
type mockMessageSwitch struct {
359
        links []htlcswitch.ChannelUpdateHandler
360
}
361

362
// BestHeight currently returns a dummy value.
363
func (m *mockMessageSwitch) BestHeight() uint32 {
×
364
        return 0
×
365
}
×
366

367
// CircuitModifier currently returns a dummy value.
368
func (m *mockMessageSwitch) CircuitModifier() htlcswitch.CircuitModifier {
×
369
        return nil
×
370
}
×
371

372
// RemoveLink currently does nothing.
373
func (m *mockMessageSwitch) RemoveLink(cid lnwire.ChannelID) {}
8✔
374

375
// CreateAndAddLink currently returns a dummy value.
376
func (m *mockMessageSwitch) CreateAndAddLink(cfg htlcswitch.ChannelLinkConfig,
377
        lnChan *lnwallet.LightningChannel) error {
×
378

×
379
        return nil
×
380
}
×
381

382
// GetLinksByInterface returns the active links.
383
func (m *mockMessageSwitch) GetLinksByInterface(pub [33]byte) (
384
        []htlcswitch.ChannelUpdateHandler, error) {
19✔
385

19✔
386
        return m.links, nil
19✔
387
}
19✔
388

389
// mockUpdateHandler is a mock implementation of the ChannelUpdateHandler
390
// interface. It is used in mockMessageSwitch's GetLinksByInterface method.
391
type mockUpdateHandler struct {
392
        cid                  lnwire.ChannelID
393
        isOutgoingAddBlocked atomic.Bool
394
        isIncomingAddBlocked atomic.Bool
395
}
396

397
// newMockUpdateHandler creates a new mockUpdateHandler.
398
func newMockUpdateHandler(cid lnwire.ChannelID) *mockUpdateHandler {
9✔
399
        return &mockUpdateHandler{
9✔
400
                cid: cid,
9✔
401
        }
9✔
402
}
9✔
403

404
// HandleChannelUpdate currently does nothing.
405
func (m *mockUpdateHandler) HandleChannelUpdate(msg lnwire.Message) {}
×
406

407
// ChanID returns the mockUpdateHandler's cid.
408
func (m *mockUpdateHandler) ChanID() lnwire.ChannelID { return m.cid }
18✔
409

410
// Bandwidth currently returns a dummy value.
411
func (m *mockUpdateHandler) Bandwidth() lnwire.MilliSatoshi { return 0 }
×
412

413
// EligibleToForward currently returns a dummy value.
414
func (m *mockUpdateHandler) EligibleToForward() bool { return false }
×
415

416
// MayAddOutgoingHtlc currently returns nil.
417
func (m *mockUpdateHandler) MayAddOutgoingHtlc(lnwire.MilliSatoshi) error { return nil }
×
418

419
type mockMessageConn struct {
420
        t *testing.T
421

422
        // MessageConn embeds our interface so that the mock does not need to
423
        // implement every function. The mock will panic if an unspecified function
424
        // is called.
425
        MessageConn
426

427
        // writtenMessages is a channel that our mock pushes written messages into.
428
        writtenMessages chan []byte
429

430
        readMessages   chan []byte
431
        curReadMessage []byte
432

433
        // writeRaceDetectingCounter is incremented on any function call
434
        // associated with writing to the connection. The race detector will
435
        // trigger on this counter if a data race exists.
436
        writeRaceDetectingCounter int
437

438
        // readRaceDetectingCounter is incremented on any function call
439
        // associated with reading from the connection. The race detector will
440
        // trigger on this counter if a data race exists.
441
        readRaceDetectingCounter int
442
}
443

444
func (m *mockUpdateHandler) EnableAdds(dir htlcswitch.LinkDirection) bool {
×
445
        if dir == htlcswitch.Outgoing {
×
446
                return m.isOutgoingAddBlocked.Swap(false)
×
447
        }
×
448

449
        return m.isIncomingAddBlocked.Swap(false)
×
450
}
451

452
func (m *mockUpdateHandler) DisableAdds(dir htlcswitch.LinkDirection) bool {
12✔
453
        if dir == htlcswitch.Outgoing {
20✔
454
                return !m.isOutgoingAddBlocked.Swap(true)
8✔
455
        }
8✔
456

457
        return !m.isIncomingAddBlocked.Swap(true)
4✔
458
}
459

460
func (m *mockUpdateHandler) IsFlushing(dir htlcswitch.LinkDirection) bool {
×
461
        switch dir {
×
462
        case htlcswitch.Outgoing:
×
463
                return m.isOutgoingAddBlocked.Load()
×
464
        case htlcswitch.Incoming:
×
465
                return m.isIncomingAddBlocked.Load()
×
466
        }
467

468
        return false
×
469
}
470

471
func (m *mockUpdateHandler) OnFlushedOnce(hook func()) {
4✔
472
        hook()
4✔
473
}
4✔
474
func (m *mockUpdateHandler) OnCommitOnce(
475
        _ htlcswitch.LinkDirection, hook func(),
476
) {
8✔
477

8✔
478
        hook()
8✔
479
}
8✔
480

481
func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn {
20✔
482
        return &mockMessageConn{
20✔
483
                t:               t,
20✔
484
                writtenMessages: make(chan []byte, expectedMessages),
20✔
485
                readMessages:    make(chan []byte, 1),
20✔
486
        }
20✔
487
}
20✔
488

489
// SetWriteDeadline mocks setting write deadline for our conn.
490
func (m *mockMessageConn) SetWriteDeadline(time.Time) error {
13✔
491
        m.writeRaceDetectingCounter++
13✔
492
        return nil
13✔
493
}
13✔
494

495
// Flush mocks a message conn flush.
496
func (m *mockMessageConn) Flush() (int, error) {
13✔
497
        m.writeRaceDetectingCounter++
13✔
498
        return 0, nil
13✔
499
}
13✔
500

501
// WriteMessage mocks sending of a message on our connection. It will push
502
// the bytes sent into the mock's writtenMessages channel.
503
func (m *mockMessageConn) WriteMessage(msg []byte) error {
13✔
504
        m.writeRaceDetectingCounter++
13✔
505

13✔
506
        msgCopy := make([]byte, len(msg))
13✔
507
        copy(msgCopy, msg)
13✔
508

13✔
509
        select {
13✔
510
        case m.writtenMessages <- msgCopy:
13✔
511
        case <-time.After(timeout):
×
512
                m.t.Fatalf("timeout sending message: %v", msgCopy)
×
513
        }
514

515
        return nil
13✔
516
}
517

518
// assertWrite asserts that our mock as had WriteMessage called with the byte
519
// slice we expect.
520
func (m *mockMessageConn) assertWrite(expected []byte) {
4✔
521
        select {
4✔
522
        case actual := <-m.writtenMessages:
4✔
523
                require.Equal(m.t, expected, actual)
4✔
524

525
        case <-time.After(timeout):
×
526
                m.t.Fatalf("timeout waiting for write: %v", expected)
×
527
        }
528
}
529

530
func (m *mockMessageConn) SetReadDeadline(t time.Time) error {
11✔
531
        m.readRaceDetectingCounter++
11✔
532
        return nil
11✔
533
}
11✔
534

535
func (m *mockMessageConn) ReadNextHeader() (uint32, error) {
7✔
536
        m.readRaceDetectingCounter++
7✔
537
        m.curReadMessage = <-m.readMessages
7✔
538
        return uint32(len(m.curReadMessage)), nil
7✔
539
}
7✔
540

541
func (m *mockMessageConn) ReadNextBody(buf []byte) ([]byte, error) {
4✔
542
        m.readRaceDetectingCounter++
4✔
543
        return m.curReadMessage, nil
4✔
544
}
4✔
545

546
func (m *mockMessageConn) RemoteAddr() net.Addr {
39✔
547
        return nil
39✔
548
}
39✔
549

550
func (m *mockMessageConn) LocalAddr() net.Addr {
3✔
551
        return nil
3✔
552
}
3✔
553

UNCOV
554
func (m *mockMessageConn) Close() error {
×
UNCOV
555
        return nil
×
UNCOV
556
}
×
557

558
// createTestPeer creates a new peer for testing and returns a context struct
559
// containing necessary handles and mock objects for conducting tests on peer
560
// functionalities.
561
func createTestPeer(t *testing.T) *peerTestCtx {
19✔
562
        nodeKeyLocator := keychain.KeyLocator{
19✔
563
                Family: keychain.KeyFamilyNodeKey,
19✔
564
        }
19✔
565

19✔
566
        aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
19✔
567
                channels.AlicesPrivKey,
19✔
568
        )
19✔
569

19✔
570
        aliceKeySigner := keychain.NewPrivKeyMessageSigner(
19✔
571
                aliceKeyPriv, nodeKeyLocator,
19✔
572
        )
19✔
573

19✔
574
        aliceAddr := &net.TCPAddr{
19✔
575
                IP:   net.ParseIP("127.0.0.1"),
19✔
576
                Port: 18555,
19✔
577
        }
19✔
578
        cfgAddr := &lnwire.NetAddress{
19✔
579
                IdentityKey: aliceKeyPub,
19✔
580
                Address:     aliceAddr,
19✔
581
                ChainNet:    wire.SimNet,
19✔
582
        }
19✔
583

19✔
584
        errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize)
19✔
585
        require.NoError(t, err)
19✔
586

19✔
587
        chainIO := &mock.ChainIO{
19✔
588
                BestHeight: broadcastHeight,
19✔
589
        }
19✔
590

19✔
591
        publishTx := make(chan *wire.MsgTx)
19✔
592
        wallet := &lnwallet.LightningWallet{
19✔
593
                WalletController: &mock.WalletController{
19✔
594
                        RootKey:               aliceKeyPriv,
19✔
595
                        PublishedTransactions: publishTx,
19✔
596
                },
19✔
597
        }
19✔
598

19✔
599
        const chanActiveTimeout = time.Minute
19✔
600

19✔
601
        dbAlice, err := channeldb.Open(t.TempDir())
19✔
602
        require.NoError(t, err)
19✔
603
        t.Cleanup(func() {
38✔
604
                require.NoError(t, dbAlice.Close())
19✔
605
        })
19✔
606

607
        nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
19✔
608

19✔
609
        chanStatusMgr, err := netann.NewChanStatusManager(&netann.
19✔
610
                ChanStatusConfig{
19✔
611
                ChanStatusSampleInterval: 30 * time.Second,
19✔
612
                ChanEnableTimeout:        chanActiveTimeout,
19✔
613
                ChanDisableTimeout:       2 * time.Minute,
19✔
614
                DB:                       dbAlice.ChannelStateDB(),
19✔
615
                Graph:                    dbAlice.ChannelGraph(),
19✔
616
                MessageSigner:            nodeSignerAlice,
19✔
617
                OurPubKey:                aliceKeyPub,
19✔
618
                OurKeyLoc:                testKeyLoc,
19✔
619
                IsChannelActive: func(lnwire.ChannelID) bool {
19✔
620
                        return true
×
621
                },
×
622
                ApplyChannelUpdate: func(*lnwire.ChannelUpdate1,
623
                        *wire.OutPoint, bool) error {
×
624

×
625
                        return nil
×
626
                },
×
627
        })
628
        require.NoError(t, err)
19✔
629

19✔
630
        interceptableSwitchNotifier := &mock.ChainNotifier{
19✔
631
                EpochChan: make(chan *chainntnfs.BlockEpoch, 1),
19✔
632
        }
19✔
633
        interceptableSwitchNotifier.EpochChan <- &chainntnfs.BlockEpoch{
19✔
634
                Height: 1,
19✔
635
        }
19✔
636

19✔
637
        interceptableSwitch, err := htlcswitch.NewInterceptableSwitch(
19✔
638
                &htlcswitch.InterceptableSwitchConfig{
19✔
639
                        CltvRejectDelta:    testCltvRejectDelta,
19✔
640
                        CltvInterceptDelta: testCltvRejectDelta + 3,
19✔
641
                        Notifier:           interceptableSwitchNotifier,
19✔
642
                },
19✔
643
        )
19✔
644
        require.NoError(t, err)
19✔
645

19✔
646
        // TODO(yy): create interface for lnwallet.LightningChannel so we can
19✔
647
        // easily mock it without the following setups.
19✔
648
        notifier := &mock.ChainNotifier{
19✔
649
                SpendChan: make(chan *chainntnfs.SpendDetail),
19✔
650
                EpochChan: make(chan *chainntnfs.BlockEpoch),
19✔
651
                ConfChan:  make(chan *chainntnfs.TxConfirmation),
19✔
652
        }
19✔
653

19✔
654
        mockSwitch := &mockMessageSwitch{}
19✔
655

19✔
656
        // TODO(yy): change ChannelNotifier to be an interface.
19✔
657
        channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
19✔
658
        require.NoError(t, channelNotifier.Start())
19✔
659
        t.Cleanup(func() {
38✔
660
                require.NoError(t, channelNotifier.Stop(),
19✔
661
                        "stop channel notifier failed")
19✔
662
        })
19✔
663

664
        writeBufferPool := pool.NewWriteBuffer(
19✔
665
                pool.DefaultWriteBufferGCInterval,
19✔
666
                pool.DefaultWriteBufferExpiryInterval,
19✔
667
        )
19✔
668

19✔
669
        writePool := pool.NewWrite(
19✔
670
                writeBufferPool, 1, timeout,
19✔
671
        )
19✔
672
        require.NoError(t, writePool.Start())
19✔
673

19✔
674
        readBufferPool := pool.NewReadBuffer(
19✔
675
                pool.DefaultReadBufferGCInterval,
19✔
676
                pool.DefaultReadBufferExpiryInterval,
19✔
677
        )
19✔
678

19✔
679
        readPool := pool.NewRead(
19✔
680
                readBufferPool, 1, timeout,
19✔
681
        )
19✔
682
        require.NoError(t, readPool.Start())
19✔
683

19✔
684
        mockConn := newMockConn(t, 1)
19✔
685

19✔
686
        receivedCustomChan := make(chan *customMsg)
19✔
687

19✔
688
        var pubKey [33]byte
19✔
689
        copy(pubKey[:], aliceKeyPub.SerializeCompressed())
19✔
690

19✔
691
        estimator := chainfee.NewStaticEstimator(12500, 0)
19✔
692

19✔
693
        cfg := &Config{
19✔
694
                Addr:              cfgAddr,
19✔
695
                PubKeyBytes:       pubKey,
19✔
696
                ErrorBuffer:       errBuffer,
19✔
697
                ChainIO:           chainIO,
19✔
698
                Switch:            mockSwitch,
19✔
699
                ChanActiveTimeout: chanActiveTimeout,
19✔
700
                InterceptSwitch:   interceptableSwitch,
19✔
701
                ChannelDB:         dbAlice.ChannelStateDB(),
19✔
702
                FeeEstimator:      estimator,
19✔
703
                Wallet:            wallet,
19✔
704
                ChainNotifier:     notifier,
19✔
705
                ChanStatusMgr:     chanStatusMgr,
19✔
706
                Features: lnwire.NewFeatureVector(
19✔
707
                        nil, lnwire.Features,
19✔
708
                ),
19✔
709
                DisconnectPeer: func(b *btcec.PublicKey) error {
19✔
710
                        return nil
×
711
                },
×
712
                ChannelNotifier:               channelNotifier,
713
                PrunePersistentPeerConnection: func([33]byte) {},
1✔
714
                LegacyFeatures:                lnwire.EmptyFeatureVector(),
715
                WritePool:                     writePool,
716
                ReadPool:                      readPool,
717
                Conn:                          mockConn,
718
                HandleCustomMessage: func(
719
                        peer [33]byte, msg *lnwire.Custom) error {
1✔
720

1✔
721
                        receivedCustomChan <- &customMsg{
1✔
722
                                peer: peer,
1✔
723
                                msg:  *msg,
1✔
724
                        }
1✔
725

1✔
726
                        return nil
1✔
727
                },
1✔
728
                PongBuf: make([]byte, lnwire.MaxPongBytes),
729
                FetchLastChanUpdate: func(chanID lnwire.ShortChannelID,
730
                ) (*lnwire.ChannelUpdate1, error) {
2✔
731

2✔
732
                        return &lnwire.ChannelUpdate1{}, nil
2✔
733
                },
2✔
734
        }
735

736
        alicePeer := NewBrontide(*cfg)
19✔
737

19✔
738
        return &peerTestCtx{
19✔
739
                publishTx:     publishTx,
19✔
740
                mockSwitch:    mockSwitch,
19✔
741
                peer:          alicePeer,
19✔
742
                notifier:      notifier,
19✔
743
                db:            dbAlice,
19✔
744
                privKey:       aliceKeyPriv,
19✔
745
                mockConn:      mockConn,
19✔
746
                customChan:    receivedCustomChan,
19✔
747
                chanStatusMgr: chanStatusMgr,
19✔
748
        }
19✔
749
}
750

751
// startPeer invokes the `Start` method on the specified peer and handles any
752
// initial startup messages for testing.
753
func startPeer(t *testing.T, mockConn *mockMessageConn,
754
        peer *Brontide) <-chan struct{} {
3✔
755

3✔
756
        // Start the peer in a goroutine so that we can handle and test for
3✔
757
        // startup messages. Successfully sending and receiving init message,
3✔
758
        // indicates a successful startup.
3✔
759
        done := make(chan struct{})
3✔
760
        go func() {
6✔
761
                require.NoError(t, peer.Start())
3✔
762
                close(done)
3✔
763
        }()
3✔
764

765
        // Receive the init message that should be the first message received on
766
        // startup.
767
        rawMsg, err := fn.RecvOrTimeout[[]byte](
3✔
768
                mockConn.writtenMessages, timeout,
3✔
769
        )
3✔
770
        require.NoError(t, err)
3✔
771

3✔
772
        msgReader := bytes.NewReader(rawMsg)
3✔
773
        nextMsg, err := lnwire.ReadMessage(msgReader, 0)
3✔
774
        require.NoError(t, err)
3✔
775

3✔
776
        _, ok := nextMsg.(*lnwire.Init)
3✔
777
        require.True(t, ok)
3✔
778

3✔
779
        // Write the reply for the init message to complete the startup.
3✔
780
        initReplyMsg := lnwire.NewInitMessage(
3✔
781
                lnwire.NewRawFeatureVector(
3✔
782
                        lnwire.DataLossProtectRequired,
3✔
783
                        lnwire.GossipQueriesOptional,
3✔
784
                ),
3✔
785
                lnwire.NewRawFeatureVector(),
3✔
786
        )
3✔
787

3✔
788
        var b bytes.Buffer
3✔
789
        _, err = lnwire.WriteMessage(&b, initReplyMsg, 0)
3✔
790
        require.NoError(t, err)
3✔
791

3✔
792
        ok = fn.SendOrQuit[[]byte, struct{}](
3✔
793
                mockConn.readMessages, b.Bytes(), make(chan struct{}),
3✔
794
        )
3✔
795
        require.True(t, ok)
3✔
796

3✔
797
        return done
3✔
798
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc