• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/watchtower/wtdb/migration8/migration.go
1
package migration8
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8

9
        "github.com/lightningnetwork/lnd/kvdb"
10
)
11

12
var (
13
        // cSessionBkt is a top-level bucket storing:
14
        //   session-id => cSessionBody -> encoded ClientSessionBody
15
        //                 => cSessionDBID -> db-assigned-id
16
        //              => cSessionCommits => seqnum -> encoded CommittedUpdate
17
        //              => cSessionAckRangeIndex => db-chan-id => start -> end
18
        //                 => cSessionRogueUpdateCount -> count
19
        cSessionBkt = []byte("client-session-bucket")
20

21
        // cChanIDIndexBkt is a top-level bucket storing:
22
        //    db-assigned-id -> channel-ID
23
        cChanIDIndexBkt = []byte("client-channel-id-index")
24

25
        // cSessionAckRangeIndex is a sub-bucket of cSessionBkt storing
26
        //    chan-id => start -> end
27
        cSessionAckRangeIndex = []byte("client-session-ack-range-index")
28

29
        // cSessionBody is a sub-bucket of cSessionBkt storing:
30
        //    seqnum -> encoded CommittedUpdate.
31
        cSessionCommits = []byte("client-session-commits")
32

33
        // cChanDetailsBkt is a top-level bucket storing:
34
        //   channel-id => cChannelSummary -> encoded ClientChanSummary.
35
        //                  => cChanDBID -> db-assigned-id
36
        //                 => cChanSessions => db-session-id -> 1
37
        //                 => cChanClosedHeight -> block-height
38
        //                 => cChanMaxCommitmentHeight -> commitment-height
39
        cChanDetailsBkt = []byte("client-channel-detail-bucket")
40

41
        cChanMaxCommitmentHeight = []byte(
42
                "client-channel-max-commitment-height",
43
        )
44

45
        // ErrUninitializedDB signals that top-level buckets for the database
46
        // have not been initialized.
47
        ErrUninitializedDB = errors.New("db not initialized")
48

49
        byteOrder = binary.BigEndian
50
)
51

52
// MigrateChannelMaxHeights migrates the tower client db by collecting all the
53
// max commitment heights that have been backed up for each channel and then
54
// storing those heights alongside the channel info.
UNCOV
55
func MigrateChannelMaxHeights(tx kvdb.RwTx) error {
×
UNCOV
56
        log.Infof("Migrating the tower client DB for quick channel max " +
×
UNCOV
57
                "commitment height lookup")
×
UNCOV
58

×
UNCOV
59
        heights, err := collectChanMaxHeights(tx)
×
UNCOV
60
        if err != nil {
×
61
                return err
×
62
        }
×
63

UNCOV
64
        return writeChanMaxHeights(tx, heights)
×
65
}
66

67
// writeChanMaxHeights iterates over the given channel ID to height map and
68
// writes an entry under the cChanMaxCommitmentHeight key for each channel.
UNCOV
69
func writeChanMaxHeights(tx kvdb.RwTx, heights map[ChannelID]uint64) error {
×
UNCOV
70
        chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
×
UNCOV
71
        if chanDetailsBkt == nil {
×
72
                return ErrUninitializedDB
×
73
        }
×
74

UNCOV
75
        for chanID, maxHeight := range heights {
×
UNCOV
76
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:])
×
UNCOV
77

×
UNCOV
78
                // If the details bucket for this channel ID does not exist,
×
UNCOV
79
                // it is probably a channel that has been closed and deleted
×
UNCOV
80
                // already. So we can skip this height.
×
UNCOV
81
                if chanDetails == nil {
×
UNCOV
82
                        continue
×
83
                }
84

UNCOV
85
                b, err := writeBigSize(maxHeight)
×
UNCOV
86
                if err != nil {
×
87
                        return err
×
88
                }
×
89

UNCOV
90
                err = chanDetails.Put(cChanMaxCommitmentHeight, b)
×
UNCOV
91
                if err != nil {
×
92
                        return err
×
93
                }
×
94
        }
95

UNCOV
96
        return nil
×
97
}
98

99
// collectChanMaxHeights iterates over all the sessions in the DB. For each
100
// session, it iterates over all the Acked updates and the committed updates
101
// to collect the maximum commitment height for each channel.
UNCOV
102
func collectChanMaxHeights(tx kvdb.RwTx) (map[ChannelID]uint64, error) {
×
UNCOV
103
        sessionsBkt := tx.ReadBucket(cSessionBkt)
×
UNCOV
104
        if sessionsBkt == nil {
×
105
                return nil, ErrUninitializedDB
×
106
        }
×
107

UNCOV
108
        chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
×
UNCOV
109
        if chanIDIndexBkt == nil {
×
110
                return nil, ErrUninitializedDB
×
111
        }
×
112

UNCOV
113
        heights := make(map[ChannelID]uint64)
×
UNCOV
114

×
UNCOV
115
        // For each update we consider, we will only update the heights map if
×
UNCOV
116
        // the commitment height for the channel is larger than the current
×
UNCOV
117
        // max height stored for the channel.
×
UNCOV
118
        cb := func(chanID ChannelID, commitHeight uint64) {
×
UNCOV
119
                if commitHeight > heights[chanID] {
×
UNCOV
120
                        heights[chanID] = commitHeight
×
UNCOV
121
                }
×
122
        }
123

UNCOV
124
        err := sessionsBkt.ForEach(func(sessIDBytes, _ []byte) error {
×
UNCOV
125
                sessBkt := sessionsBkt.NestedReadBucket(sessIDBytes)
×
UNCOV
126
                if sessBkt == nil {
×
127
                        return fmt.Errorf("bucket not found for session %x",
×
128
                                sessIDBytes)
×
129
                }
×
130

UNCOV
131
                err := forEachCommittedUpdate(sessBkt, cb)
×
UNCOV
132
                if err != nil {
×
133
                        return err
×
134
                }
×
135

UNCOV
136
                return forEachAckedUpdate(sessBkt, chanIDIndexBkt, cb)
×
137
        })
UNCOV
138
        if err != nil {
×
139
                return nil, err
×
140
        }
×
141

UNCOV
142
        return heights, nil
×
143
}
144

145
// forEachCommittedUpdate iterates over all the given session's committed
146
// updates and calls the call-back for each.
147
func forEachCommittedUpdate(sessBkt kvdb.RBucket,
UNCOV
148
        cb func(chanID ChannelID, commitHeight uint64)) error {
×
UNCOV
149

×
UNCOV
150
        sessionCommits := sessBkt.NestedReadBucket(cSessionCommits)
×
UNCOV
151
        if sessionCommits == nil {
×
UNCOV
152
                return nil
×
UNCOV
153
        }
×
154

UNCOV
155
        return sessionCommits.ForEach(func(k, v []byte) error {
×
UNCOV
156
                var update CommittedUpdate
×
UNCOV
157
                err := update.Decode(bytes.NewReader(v))
×
UNCOV
158
                if err != nil {
×
159
                        return err
×
160
                }
×
161

UNCOV
162
                cb(update.BackupID.ChanID, update.BackupID.CommitHeight)
×
UNCOV
163

×
UNCOV
164
                return nil
×
165
        })
166
}
167

168
// forEachAckedUpdate iterates over all the given session's acked update range
169
// indices and calls the call-back for each.
170
func forEachAckedUpdate(sessBkt, chanIDIndexBkt kvdb.RBucket,
UNCOV
171
        cb func(chanID ChannelID, commitHeight uint64)) error {
×
UNCOV
172

×
UNCOV
173
        sessionAcksRanges := sessBkt.NestedReadBucket(cSessionAckRangeIndex)
×
UNCOV
174
        if sessionAcksRanges == nil {
×
UNCOV
175
                return nil
×
UNCOV
176
        }
×
177

UNCOV
178
        return sessionAcksRanges.ForEach(func(dbChanID, _ []byte) error {
×
UNCOV
179
                rangeBkt := sessionAcksRanges.NestedReadBucket(dbChanID)
×
UNCOV
180
                if rangeBkt == nil {
×
181
                        return nil
×
182
                }
×
183

UNCOV
184
                index, err := readRangeIndex(rangeBkt)
×
UNCOV
185
                if err != nil {
×
186
                        return err
×
187
                }
×
188

UNCOV
189
                chanIDBytes := chanIDIndexBkt.Get(dbChanID)
×
UNCOV
190
                var chanID ChannelID
×
UNCOV
191
                copy(chanID[:], chanIDBytes)
×
UNCOV
192

×
UNCOV
193
                cb(chanID, index.MaxHeight())
×
UNCOV
194

×
UNCOV
195
                return nil
×
196
        })
197
}
198

199
// readRangeIndex reads a persisted RangeIndex from the passed bucket and into
200
// a new in-memory RangeIndex.
UNCOV
201
func readRangeIndex(rangesBkt kvdb.RBucket) (*RangeIndex, error) {
×
UNCOV
202
        ranges := make(map[uint64]uint64)
×
UNCOV
203
        err := rangesBkt.ForEach(func(k, v []byte) error {
×
UNCOV
204
                start, err := readBigSize(k)
×
UNCOV
205
                if err != nil {
×
206
                        return err
×
207
                }
×
208

UNCOV
209
                end, err := readBigSize(v)
×
UNCOV
210
                if err != nil {
×
211
                        return err
×
212
                }
×
213

UNCOV
214
                ranges[start] = end
×
UNCOV
215

×
UNCOV
216
                return nil
×
217
        })
UNCOV
218
        if err != nil {
×
219
                return nil, err
×
220
        }
×
221

UNCOV
222
        return NewRangeIndex(ranges, WithSerializeUint64Fn(writeBigSize))
×
223
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc