• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16969463412

14 Aug 2025 03:23PM UTC coverage: 66.776% (-0.2%) from 66.929%
16969463412

push

github

web-flow
Merge pull request #10155 from ziggie1984/add-missing-invoice-settle-index

Add missing invoice index for native sql

135916 of 203540 relevant lines covered (66.78%)

21469.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.0
/sqldb/paginate.go
1
package sqldb
2

3
import (
4
        "context"
5
        "fmt"
6
)
7

8
const (
9
        // maxSQLiteBatchSize is the maximum number of items that can be
10
        // included in a batch query IN clause for SQLite. This was determined
11
        // using the TestSQLSliceQueries test.
12
        maxSQLiteBatchSize = 32766
13

14
        // maxPostgresBatchSize is the maximum number of items that can be
15
        // included in a batch query IN clause for Postgres. This was determined
16
        // using the TestSQLSliceQueries test.
17
        maxPostgresBatchSize = 65535
18

19
        // defaultSQLitePageSize is the default page size for SQLite queries.
20
        defaultSQLitePageSize = 100
21

22
        // defaultPostgresPageSize is the default page size for Postgres
23
        // queries.
24
        defaultPostgresPageSize = 10500
25

26
        // defaultSQLiteBatchSize is the default batch size for SQLite queries.
27
        defaultSQLiteBatchSize = 250
28

29
        // defaultPostgresBatchSize is the default batch size for Postgres
30
        // queries.
31
        defaultPostgresBatchSize = 5000
32
)
33

34
// QueryConfig holds configuration values for SQL queries.
35
//
36
//nolint:ll
37
type QueryConfig struct {
38
        // MaxBatchSize is the maximum number of items included in a batch
39
        // query IN clauses list.
40
        MaxBatchSize uint32 `long:"max-batch-size" description:"The maximum number of items to include in a batch query IN clause. This is used for queries that fetch results based on a list of identifiers."`
41

42
        // MaxPageSize is the maximum number of items returned in a single page
43
        // of results. This is used for paginated queries.
44
        MaxPageSize uint32 `long:"max-page-size" description:"The maximum number of items to return in a single page of results. This is used for paginated queries."`
45
}
46

47
// Validate checks that the QueryConfig values are valid.
48
func (c *QueryConfig) Validate(sqlite bool) error {
×
49
        if c.MaxBatchSize <= 0 {
×
50
                return fmt.Errorf("max batch size must be greater than "+
×
51
                        "zero, got %d", c.MaxBatchSize)
×
52
        }
×
53
        if c.MaxPageSize <= 0 {
×
54
                return fmt.Errorf("max page size must be greater than "+
×
55
                        "zero, got %d", c.MaxPageSize)
×
56
        }
×
57

58
        if sqlite {
×
59
                if c.MaxBatchSize > maxSQLiteBatchSize {
×
60
                        return fmt.Errorf("max batch size for SQLite cannot "+
×
61
                                "exceed %d, got %d", maxSQLiteBatchSize,
×
62
                                c.MaxBatchSize)
×
63
                }
×
64
        } else {
×
65
                if c.MaxBatchSize > maxPostgresBatchSize {
×
66
                        return fmt.Errorf("max batch size for Postgres cannot "+
×
67
                                "exceed %d, got %d", maxPostgresBatchSize,
×
68
                                c.MaxBatchSize)
×
69
                }
×
70
        }
71

72
        return nil
×
73
}
74

75
// DefaultSQLiteConfig returns a default configuration for SQL queries to a
76
// SQLite backend.
77
func DefaultSQLiteConfig() *QueryConfig {
2✔
78
        return &QueryConfig{
2✔
79
                MaxBatchSize: defaultSQLiteBatchSize,
2✔
80
                MaxPageSize:  defaultSQLitePageSize,
2✔
81
        }
2✔
82
}
2✔
83

84
// DefaultPostgresConfig returns a default configuration for SQL queries to a
85
// Postgres backend.
86
func DefaultPostgresConfig() *QueryConfig {
2✔
87
        return &QueryConfig{
2✔
88
                MaxBatchSize: defaultPostgresBatchSize,
2✔
89
                MaxPageSize:  defaultPostgresPageSize,
2✔
90
        }
2✔
91
}
2✔
92

93
// BatchQueryFunc represents a function that takes a batch of converted items
94
// and returns results.
95
type BatchQueryFunc[T any, R any] func(context.Context, []T) ([]R, error)
96

97
// ItemCallbackFunc represents a function that processes individual results.
98
type ItemCallbackFunc[R any] func(context.Context, R) error
99

100
// ConvertFunc represents a function that converts from input type to query type
101
// for the batch query.
102
type ConvertFunc[I any, T any] func(I) T
103

104
// ExecuteBatchQuery executes a query in batches over a slice of input items.
105
// It converts the input items to a query type using the provided convertFunc,
106
// executes the query in batches using the provided queryFunc, and applies
107
// the callback to each result. This is useful for queries using the
108
// "WHERE x IN []slice" pattern. It takes that slice, splits it into batches of
109
// size MaxBatchSize, and executes the query for each batch.
110
//
111
// NOTE: it is the caller's responsibility to ensure that the expected return
112
// results are unique across all pages. Meaning that if the input items are
113
// split up, a result that is returned in one page should not be expected to
114
// be returned in another page.
115
func ExecuteBatchQuery[I any, T any, R any](ctx context.Context,
116
        cfg *QueryConfig, inputItems []I, convertFunc ConvertFunc[I, T],
117
        queryFunc BatchQueryFunc[T, R], callback ItemCallbackFunc[R]) error {
×
118

×
119
        if len(inputItems) == 0 {
×
120
                return nil
×
121
        }
×
122

123
        // Process items in pages.
124
        for i := 0; i < len(inputItems); i += int(cfg.MaxBatchSize) {
×
125
                // Calculate the end index for this page.
×
126
                end := i + int(cfg.MaxBatchSize)
×
127
                if end > len(inputItems) {
×
128
                        end = len(inputItems)
×
129
                }
×
130

131
                // Get the page slice of input items.
132
                inputPage := inputItems[i:end]
×
133

×
134
                // Convert only the items needed for this page.
×
135
                convertedPage := make([]T, len(inputPage))
×
136
                for j, inputItem := range inputPage {
×
137
                        convertedPage[j] = convertFunc(inputItem)
×
138
                }
×
139

140
                // Execute the query for this page.
141
                results, err := queryFunc(ctx, convertedPage)
×
142
                if err != nil {
×
143
                        return fmt.Errorf("query failed for page "+
×
144
                                "starting at %d: %w", i, err)
×
145
                }
×
146

147
                // Apply the callback to each result.
148
                for _, result := range results {
×
149
                        if err := callback(ctx, result); err != nil {
×
150
                                return fmt.Errorf("callback failed for "+
×
151
                                        "result: %w", err)
×
152
                        }
×
153
                }
154
        }
155

156
        return nil
×
157
}
158

159
// PagedQueryFunc represents a function that fetches a page of results using a
160
// cursor. It returns the fetched items and should return an empty slice when no
161
// more results.
162
type PagedQueryFunc[C any, T any] func(context.Context, C, int32) ([]T, error)
163

164
// CursorExtractFunc represents a function that extracts the cursor value from
165
// an item. This cursor will be used for the next page fetch.
166
type CursorExtractFunc[T any, C any] func(T) C
167

168
// ItemProcessFunc represents a function that processes individual items.
169
type ItemProcessFunc[T any] func(context.Context, T) error
170

171
// ExecutePaginatedQuery executes a cursor-based paginated query. It continues
172
// fetching pages until no more results are returned, processing each item with
173
// the provided callback.
174
//
175
// Parameters:
176
// - initialCursor: the starting cursor value (e.g., 0, -1, "", etc.).
177
// - queryFunc: function that fetches a page given cursor and limit.
178
// - extractCursor: function that extracts cursor from an item for next page.
179
// - processItem: function that processes each individual item.
180
//
181
// NOTE: it is the caller's responsibility to "undo" any processing done on
182
// items if the query fails on a later page.
183
func ExecutePaginatedQuery[C any, T any](ctx context.Context, cfg *QueryConfig,
184
        initialCursor C, queryFunc PagedQueryFunc[C, T],
185
        extractCursor CursorExtractFunc[T, C],
186
        processItem ItemProcessFunc[T]) error {
×
187

×
188
        cursor := initialCursor
×
189

×
190
        for {
×
191
                // Fetch the next page.
×
192
                items, err := queryFunc(ctx, cursor, int32(cfg.MaxPageSize))
×
193
                if err != nil {
×
194
                        return fmt.Errorf("failed to fetch page with "+
×
195
                                "cursor %v: %w", cursor, err)
×
196
                }
×
197

198
                // If no items returned, we're done.
199
                if len(items) == 0 {
×
200
                        break
×
201
                }
202

203
                // Process each item in the page.
204
                for _, item := range items {
×
205
                        if err := processItem(ctx, item); err != nil {
×
206
                                return fmt.Errorf("failed to process item: %w",
×
207
                                        err)
×
208
                        }
×
209

210
                        // Update cursor for next iteration.
211
                        cursor = extractCursor(item)
×
212
                }
213

214
                // If the number of items is less than the max page size,
215
                // we assume there are no more items to fetch.
216
                if len(items) < int(cfg.MaxPageSize) {
×
217
                        break
×
218
                }
219
        }
220

221
        return nil
×
222
}
223

224
// CollectAndBatchDataQueryFunc represents a function that batch loads
225
// additional data for collected identifiers, returning the batch data that
226
// applies to all items.
227
type CollectAndBatchDataQueryFunc[ID any, BatchData any] func(context.Context,
228
        []ID) (BatchData, error)
229

230
// ItemWithBatchDataProcessFunc represents a function that processes individual
231
// items along with shared batch data.
232
type ItemWithBatchDataProcessFunc[T any, BatchData any] func(context.Context,
233
        T, BatchData) error
234

235
// CollectFunc represents a function that extracts an identifier from a
236
// paginated item.
237
type CollectFunc[T any, ID any] func(T) (ID, error)
238

239
// ExecuteCollectAndBatchWithSharedDataQuery implements a page-by-page
240
// processing pattern where each page is immediately processed with batch-loaded
241
// data before moving to the next page.
242
//
243
// It:
244
// 1. Fetches a page of items using cursor-based pagination
245
// 2. Collects identifiers from that page and batch loads shared data
246
// 3. Processes each item in the page with the shared batch data
247
// 4. Moves to the next page and repeats
248
//
249
// Parameters:
250
// - initialCursor: starting cursor for pagination
251
// - pageQueryFunc: fetches a page of items
252
// - extractPageCursor: extracts cursor from paginated item for next page
253
// - collectFunc: extracts identifier from paginated item
254
// - batchDataFunc: batch loads shared data from collected IDs for one page
255
// - processItem: processes each item with the shared batch data
256
func ExecuteCollectAndBatchWithSharedDataQuery[C any, T any, I any, D any](
257
        ctx context.Context, cfg *QueryConfig, initialCursor C,
258
        pageQueryFunc PagedQueryFunc[C, T],
259
        extractPageCursor CursorExtractFunc[T, C],
260
        collectFunc CollectFunc[T, I],
261
        batchDataFunc CollectAndBatchDataQueryFunc[I, D],
262
        processItem ItemWithBatchDataProcessFunc[T, D]) error {
×
263

×
264
        cursor := initialCursor
×
265

×
266
        for {
×
267
                // Step 1: Fetch the next page of items.
×
268
                items, err := pageQueryFunc(ctx, cursor, int32(cfg.MaxPageSize))
×
269
                if err != nil {
×
270
                        return fmt.Errorf("failed to fetch page with "+
×
271
                                "cursor %v: %w", cursor, err)
×
272
                }
×
273

274
                // If no items returned, we're done.
275
                if len(items) == 0 {
×
276
                        break
×
277
                }
278

279
                // Step 2: Collect identifiers from this page and batch load
280
                // data.
281
                pageIDs := make([]I, len(items))
×
282
                for i, item := range items {
×
283
                        pageIDs[i], err = collectFunc(item)
×
284
                        if err != nil {
×
285
                                return fmt.Errorf("failed to collect "+
×
286
                                        "identifier from item: %w", err)
×
287
                        }
×
288
                }
289

290
                // Batch load shared data for this page.
291
                batchData, err := batchDataFunc(ctx, pageIDs)
×
292
                if err != nil {
×
293
                        return fmt.Errorf("failed to load batch data for "+
×
294
                                "page: %w", err)
×
295
                }
×
296

297
                // Step 3: Process each item in this page with the shared batch
298
                // data.
299
                for _, item := range items {
×
300
                        err := processItem(ctx, item, batchData)
×
301
                        if err != nil {
×
302
                                return fmt.Errorf("failed to process item "+
×
303
                                        "with batch data: %w", err)
×
304
                        }
×
305

306
                        // Update cursor for next page.
307
                        cursor = extractPageCursor(item)
×
308
                }
309

310
                // If the number of items is less than the max page size,
311
                // we assume there are no more items to fetch.
312
                if len(items) < int(cfg.MaxPageSize) {
×
313
                        break
×
314
                }
315
        }
316

317
        return nil
×
318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc