• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13566028875

27 Feb 2025 12:09PM UTC coverage: 49.396% (-9.4%) from 58.748%
13566028875

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

34 of 54 new or added lines in 4 files covered. (62.96%)

27464 existing lines in 436 files now uncovered.

101095 of 204664 relevant lines covered (49.4%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/watchtower/wtdb/migration8/migration.go
1
package migration8
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8

9
        "github.com/lightningnetwork/lnd/kvdb"
10
)
11

12
var (
13
        // cSessionBkt is a top-level bucket storing:
14
        //   session-id => cSessionBody -> encoded ClientSessionBody
15
        //                 => cSessionDBID -> db-assigned-id
16
        //              => cSessionCommits => seqnum -> encoded CommittedUpdate
17
        //              => cSessionAckRangeIndex => db-chan-id => start -> end
18
        //                 => cSessionRogueUpdateCount -> count
19
        cSessionBkt = []byte("client-session-bucket")
20

21
        // cChanIDIndexBkt is a top-level bucket storing:
22
        //    db-assigned-id -> channel-ID
23
        cChanIDIndexBkt = []byte("client-channel-id-index")
24

25
        // cSessionAckRangeIndex is a sub-bucket of cSessionBkt storing
26
        //    chan-id => start -> end
27
        cSessionAckRangeIndex = []byte("client-session-ack-range-index")
28

29
        // cSessionBody is a sub-bucket of cSessionBkt storing:
30
        //    seqnum -> encoded CommittedUpdate.
31
        cSessionCommits = []byte("client-session-commits")
32

33
        // cChanDetailsBkt is a top-level bucket storing:
34
        //   channel-id => cChannelSummary -> encoded ClientChanSummary.
35
        //                  => cChanDBID -> db-assigned-id
36
        //                 => cChanSessions => db-session-id -> 1
37
        //                 => cChanClosedHeight -> block-height
38
        //                 => cChanMaxCommitmentHeight -> commitment-height
39
        cChanDetailsBkt = []byte("client-channel-detail-bucket")
40

41
        cChanMaxCommitmentHeight = []byte(
42
                "client-channel-max-commitment-height",
43
        )
44

45
        // ErrUninitializedDB signals that top-level buckets for the database
46
        // have not been initialized.
47
        ErrUninitializedDB = errors.New("db not initialized")
48

49
        byteOrder = binary.BigEndian
50
)
51

52
// MigrateChannelMaxHeights migrates the tower client db by collecting all the
53
// max commitment heights that have been backed up for each channel and then
54
// storing those heights alongside the channel info.
UNCOV
55
func MigrateChannelMaxHeights(tx kvdb.RwTx) error {
×
UNCOV
56
        log.Infof("Migrating the tower client DB for quick channel max " +
×
UNCOV
57
                "commitment height lookup")
×
UNCOV
58

×
UNCOV
59
        heights, err := collectChanMaxHeights(tx)
×
UNCOV
60
        if err != nil {
×
61
                return err
×
62
        }
×
63

UNCOV
64
        return writeChanMaxHeights(tx, heights)
×
65
}
66

67
// writeChanMaxHeights iterates over the given channel ID to height map and
68
// writes an entry under the cChanMaxCommitmentHeight key for each channel.
UNCOV
69
func writeChanMaxHeights(tx kvdb.RwTx, heights map[ChannelID]uint64) error {
×
UNCOV
70
        chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
×
UNCOV
71
        if chanDetailsBkt == nil {
×
72
                return ErrUninitializedDB
×
73
        }
×
74

UNCOV
75
        for chanID, maxHeight := range heights {
×
UNCOV
76
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:])
×
UNCOV
77

×
UNCOV
78
                // If the details bucket for this channel ID does not exist,
×
UNCOV
79
                // it is probably a channel that has been closed and deleted
×
UNCOV
80
                // already. So we can skip this height.
×
UNCOV
81
                if chanDetails == nil {
×
UNCOV
82
                        continue
×
83
                }
84

UNCOV
85
                b, err := writeBigSize(maxHeight)
×
UNCOV
86
                if err != nil {
×
87
                        return err
×
88
                }
×
89

UNCOV
90
                err = chanDetails.Put(cChanMaxCommitmentHeight, b)
×
UNCOV
91
                if err != nil {
×
92
                        return err
×
93
                }
×
94
        }
95

UNCOV
96
        return nil
×
97
}
98

99
// collectChanMaxHeights iterates over all the sessions in the DB. For each
100
// session, it iterates over all the Acked updates and the committed updates
101
// to collect the maximum commitment height for each channel.
UNCOV
102
func collectChanMaxHeights(tx kvdb.RwTx) (map[ChannelID]uint64, error) {
×
UNCOV
103
        sessionsBkt := tx.ReadBucket(cSessionBkt)
×
UNCOV
104
        if sessionsBkt == nil {
×
105
                return nil, ErrUninitializedDB
×
106
        }
×
107

UNCOV
108
        chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
×
UNCOV
109
        if chanIDIndexBkt == nil {
×
110
                return nil, ErrUninitializedDB
×
111
        }
×
112

UNCOV
113
        heights := make(map[ChannelID]uint64)
×
UNCOV
114

×
UNCOV
115
        // For each update we consider, we will only update the heights map if
×
UNCOV
116
        // the commitment height for the channel is larger than the current
×
UNCOV
117
        // max height stored for the channel.
×
UNCOV
118
        cb := func(chanID ChannelID, commitHeight uint64) {
×
UNCOV
119
                if commitHeight > heights[chanID] {
×
UNCOV
120
                        heights[chanID] = commitHeight
×
UNCOV
121
                }
×
122
        }
123

UNCOV
124
        err := sessionsBkt.ForEach(func(sessIDBytes, _ []byte) error {
×
UNCOV
125
                sessBkt := sessionsBkt.NestedReadBucket(sessIDBytes)
×
UNCOV
126
                if sessBkt == nil {
×
127
                        return fmt.Errorf("bucket not found for session %x",
×
128
                                sessIDBytes)
×
129
                }
×
130

UNCOV
131
                err := forEachCommittedUpdate(sessBkt, cb)
×
UNCOV
132
                if err != nil {
×
133
                        return err
×
134
                }
×
135

UNCOV
136
                return forEachAckedUpdate(sessBkt, chanIDIndexBkt, cb)
×
137
        })
UNCOV
138
        if err != nil {
×
139
                return nil, err
×
140
        }
×
141

UNCOV
142
        return heights, nil
×
143
}
144

145
// forEachCommittedUpdate iterates over all the given session's committed
146
// updates and calls the call-back for each.
147
func forEachCommittedUpdate(sessBkt kvdb.RBucket,
UNCOV
148
        cb func(chanID ChannelID, commitHeight uint64)) error {
×
UNCOV
149

×
UNCOV
150
        sessionCommits := sessBkt.NestedReadBucket(cSessionCommits)
×
UNCOV
151
        if sessionCommits == nil {
×
UNCOV
152
                return nil
×
UNCOV
153
        }
×
154

UNCOV
155
        return sessionCommits.ForEach(func(k, v []byte) error {
×
UNCOV
156
                var update CommittedUpdate
×
UNCOV
157
                err := update.Decode(bytes.NewReader(v))
×
UNCOV
158
                if err != nil {
×
159
                        return err
×
160
                }
×
161

UNCOV
162
                cb(update.BackupID.ChanID, update.BackupID.CommitHeight)
×
UNCOV
163

×
UNCOV
164
                return nil
×
165
        })
166
}
167

168
// forEachAckedUpdate iterates over all the given session's acked update range
169
// indices and calls the call-back for each.
170
func forEachAckedUpdate(sessBkt, chanIDIndexBkt kvdb.RBucket,
UNCOV
171
        cb func(chanID ChannelID, commitHeight uint64)) error {
×
UNCOV
172

×
UNCOV
173
        sessionAcksRanges := sessBkt.NestedReadBucket(cSessionAckRangeIndex)
×
UNCOV
174
        if sessionAcksRanges == nil {
×
UNCOV
175
                return nil
×
UNCOV
176
        }
×
177

UNCOV
178
        return sessionAcksRanges.ForEach(func(dbChanID, _ []byte) error {
×
UNCOV
179
                rangeBkt := sessionAcksRanges.NestedReadBucket(dbChanID)
×
UNCOV
180
                if rangeBkt == nil {
×
181
                        return nil
×
182
                }
×
183

UNCOV
184
                index, err := readRangeIndex(rangeBkt)
×
UNCOV
185
                if err != nil {
×
186
                        return err
×
187
                }
×
188

UNCOV
189
                chanIDBytes := chanIDIndexBkt.Get(dbChanID)
×
UNCOV
190
                var chanID ChannelID
×
UNCOV
191
                copy(chanID[:], chanIDBytes)
×
UNCOV
192

×
UNCOV
193
                cb(chanID, index.MaxHeight())
×
UNCOV
194

×
UNCOV
195
                return nil
×
196
        })
197
}
198

199
// readRangeIndex reads a persisted RangeIndex from the passed bucket and into
200
// a new in-memory RangeIndex.
UNCOV
201
func readRangeIndex(rangesBkt kvdb.RBucket) (*RangeIndex, error) {
×
UNCOV
202
        ranges := make(map[uint64]uint64)
×
UNCOV
203
        err := rangesBkt.ForEach(func(k, v []byte) error {
×
UNCOV
204
                start, err := readBigSize(k)
×
UNCOV
205
                if err != nil {
×
206
                        return err
×
207
                }
×
208

UNCOV
209
                end, err := readBigSize(v)
×
UNCOV
210
                if err != nil {
×
211
                        return err
×
212
                }
×
213

UNCOV
214
                ranges[start] = end
×
UNCOV
215

×
UNCOV
216
                return nil
×
217
        })
UNCOV
218
        if err != nil {
×
219
                return nil, err
×
220
        }
×
221

UNCOV
222
        return NewRangeIndex(ranges, WithSerializeUint64Fn(writeBigSize))
×
223
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc