• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16647329373

31 Jul 2025 11:08AM UTC coverage: 66.99% (-0.05%) from 67.044%
16647329373

Pull #10118

github

web-flow
Merge 369b2892f into b5c290d90
Pull Request #10118: [4] sqldb+graph/db: add and use new pagination & batch query helpers

6 of 410 new or added lines in 2 files covered. (1.46%)

128 existing lines in 27 files now uncovered.

135487 of 202249 relevant lines covered (66.99%)

21660.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.52
/sqldb/paginate.go
1
package sqldb
2

3
import (
4
        "context"
5
        "fmt"
6
)
7

8
// QueryConfig holds configuration values for SQL queries.
9
type QueryConfig struct {
10
        // MaxBatchSize is the maximum number of items included in a batch
11
        // query IN clauses list.
12
        MaxBatchSize int
13

14
        // MaxPageSize is the maximum number of items returned in a single page
15
        // of results. This is used for paginated queries.
16
        MaxPageSize int32
17
}
18

19
// DefaultQueryConfig returns a default configuration for batched queries.
20
//
21
// TODO(elle): make configurable & have different defaults for SQLite and
22
// Postgres.
23
func DefaultQueryConfig() *QueryConfig {
1✔
24
        return &QueryConfig{
1✔
25
                MaxBatchSize: 250,
1✔
26
                MaxPageSize:  10000,
1✔
27
        }
1✔
28
}
1✔
29

30
// BatchQueryFunc represents a function that takes a batch of converted items
31
// and returns results.
32
type BatchQueryFunc[T any, R any] func(context.Context, []T) ([]R, error)
33

34
// ItemCallbackFunc represents a function that processes individual results.
35
type ItemCallbackFunc[R any] func(context.Context, R) error
36

37
// ConvertFunc represents a function that converts from input type to query type
38
// for the batch query.
39
type ConvertFunc[I any, T any] func(I) T
40

41
// ExecuteBatchQuery executes a query in batches over a slice of input items.
42
// It converts the input items to a query type using the provided convertFunc,
43
// executes the query in batches using the provided queryFunc, and applies
44
// the callback to each result. This is useful for queries using the
45
// "WHERE x IN []slice" pattern. It takes that slice, splits it into batches of
46
// size MaxBatchSize, and executes the query for each batch.
47
//
48
// NOTE: it is the caller's responsibility to ensure that the expected return
49
// results are unique across all pages. Meaning that if the input items are
50
// split up, a result that is returned in one page should not be expected to
51
// be returned in another page.
52
func ExecuteBatchQuery[I any, T any, R any](ctx context.Context,
53
        cfg *QueryConfig, inputItems []I, convertFunc ConvertFunc[I, T],
NEW
54
        queryFunc BatchQueryFunc[T, R], callback ItemCallbackFunc[R]) error {
×
55

×
56
        if len(inputItems) == 0 {
×
57
                return nil
×
58
        }
×
59

60
        // Process items in pages.
NEW
61
        for i := 0; i < len(inputItems); i += cfg.MaxBatchSize {
×
62
                // Calculate the end index for this page.
×
NEW
63
                end := i + cfg.MaxBatchSize
×
64
                if end > len(inputItems) {
×
65
                        end = len(inputItems)
×
66
                }
×
67

68
                // Get the page slice of input items.
69
                inputPage := inputItems[i:end]
×
70

×
71
                // Convert only the items needed for this page.
×
72
                convertedPage := make([]T, len(inputPage))
×
73
                for j, inputItem := range inputPage {
×
74
                        convertedPage[j] = convertFunc(inputItem)
×
75
                }
×
76

77
                // Execute the query for this page.
78
                results, err := queryFunc(ctx, convertedPage)
×
79
                if err != nil {
×
80
                        return fmt.Errorf("query failed for page "+
×
81
                                "starting at %d: %w", i, err)
×
82
                }
×
83

84
                // Apply the callback to each result.
85
                for _, result := range results {
×
86
                        if err := callback(ctx, result); err != nil {
×
87
                                return fmt.Errorf("callback failed for "+
×
88
                                        "result: %w", err)
×
89
                        }
×
90
                }
91
        }
92

93
        return nil
×
94
}
95

96
// PagedQueryFunc represents a function that fetches a page of results using a
97
// cursor. It returns the fetched items and should return an empty slice when no
98
// more results.
99
type PagedQueryFunc[C any, T any] func(context.Context, C, int32) ([]T, error)
100

101
// CursorExtractFunc represents a function that extracts the cursor value from
102
// an item. This cursor will be used for the next page fetch.
103
type CursorExtractFunc[T any, C any] func(T) C
104

105
// ItemProcessFunc represents a function that processes individual items.
106
type ItemProcessFunc[T any] func(context.Context, T) error
107

108
// ExecutePaginatedQuery executes a cursor-based paginated query. It continues
109
// fetching pages until no more results are returned, processing each item with
110
// the provided callback.
111
//
112
// Parameters:
113
// - initialCursor: the starting cursor value (e.g., 0, -1, "", etc.).
114
// - queryFunc: function that fetches a page given cursor and limit.
115
// - extractCursor: function that extracts cursor from an item for next page.
116
// - processItem: function that processes each individual item.
117
//
118
// NOTE: it is the caller's responsibility to "undo" any processing done on
119
// items if the query fails on a later page.
120
func ExecutePaginatedQuery[C any, T any](ctx context.Context, cfg *QueryConfig,
121
        initialCursor C, queryFunc PagedQueryFunc[C, T],
122
        extractCursor CursorExtractFunc[T, C],
NEW
123
        processItem ItemProcessFunc[T]) error {
×
NEW
124

×
NEW
125
        cursor := initialCursor
×
NEW
126

×
NEW
127
        for {
×
NEW
128
                // Fetch the next page.
×
NEW
129
                items, err := queryFunc(ctx, cursor, cfg.MaxPageSize)
×
NEW
130
                if err != nil {
×
NEW
131
                        return fmt.Errorf("failed to fetch page with "+
×
NEW
132
                                "cursor %v: %w", cursor, err)
×
NEW
133
                }
×
134

135
                // If no items returned, we're done.
NEW
136
                if len(items) == 0 {
×
NEW
137
                        break
×
138
                }
139

140
                // Process each item in the page.
NEW
141
                for _, item := range items {
×
NEW
142
                        if err := processItem(ctx, item); err != nil {
×
NEW
143
                                return fmt.Errorf("failed to process item: %w",
×
NEW
144
                                        err)
×
NEW
145
                        }
×
146

147
                        // Update cursor for next iteration.
NEW
148
                        cursor = extractCursor(item)
×
149
                }
150

151
                // If the number of items is less than the max page size,
152
                // we assume there are no more items to fetch.
NEW
153
                if len(items) < int(cfg.MaxPageSize) {
×
NEW
154
                        break
×
155
                }
156
        }
157

NEW
158
        return nil
×
159
}
160

161
// CollectAndBatchDataQueryFunc represents a function that batch loads
162
// additional data for collected identifiers, returning the batch data that
163
// applies to all items.
164
type CollectAndBatchDataQueryFunc[ID any, BatchData any] func(context.Context,
165
        []ID) (BatchData, error)
166

167
// ItemWithBatchDataProcessFunc represents a function that processes individual
168
// items along with shared batch data.
169
type ItemWithBatchDataProcessFunc[T any, BatchData any] func(context.Context,
170
        T, BatchData) error
171

172
// CollectFunc represents a function that extracts an identifier from a
173
// paginated item.
174
type CollectFunc[T any, ID any] func(T) (ID, error)
175

176
// ExecuteCollectAndBatchWithSharedDataQuery implements a page-by-page
177
// processing pattern where each page is immediately processed with batch-loaded
178
// data before moving to the next page.
179
//
180
// It:
181
// 1. Fetches a page of items using cursor-based pagination
182
// 2. Collects identifiers from that page and batch loads shared data
183
// 3. Processes each item in the page with the shared batch data
184
// 4. Moves to the next page and repeats
185
//
186
// Parameters:
187
// - initialCursor: starting cursor for pagination
188
// - pageQueryFunc: fetches a page of items
189
// - extractPageCursor: extracts cursor from paginated item for next page
190
// - collectFunc: extracts identifier from paginated item
191
// - batchDataFunc: batch loads shared data from collected IDs for one page
192
// - processItem: processes each item with the shared batch data
193
func ExecuteCollectAndBatchWithSharedDataQuery[C any, T any, I any, D any](
194
        ctx context.Context, cfg *QueryConfig, initialCursor C,
195
        pageQueryFunc PagedQueryFunc[C, T],
196
        extractPageCursor CursorExtractFunc[T, C],
197
        collectFunc CollectFunc[T, I],
198
        batchDataFunc CollectAndBatchDataQueryFunc[I, D],
NEW
199
        processItem ItemWithBatchDataProcessFunc[T, D]) error {
×
NEW
200

×
NEW
201
        cursor := initialCursor
×
NEW
202

×
NEW
203
        for {
×
NEW
204
                // Step 1: Fetch the next page of items.
×
NEW
205
                items, err := pageQueryFunc(ctx, cursor, cfg.MaxPageSize)
×
NEW
206
                if err != nil {
×
NEW
207
                        return fmt.Errorf("failed to fetch page with "+
×
NEW
208
                                "cursor %v: %w", cursor, err)
×
NEW
209
                }
×
210

211
                // If no items returned, we're done.
NEW
212
                if len(items) == 0 {
×
NEW
213
                        break
×
214
                }
215

216
                // Step 2: Collect identifiers from this page and batch load
217
                // data.
NEW
218
                pageIDs := make([]I, len(items))
×
NEW
219
                for i, item := range items {
×
NEW
220
                        pageIDs[i], err = collectFunc(item)
×
NEW
221
                        if err != nil {
×
NEW
222
                                return fmt.Errorf("failed to collect "+
×
NEW
223
                                        "identifier from item: %w", err)
×
NEW
224
                        }
×
225
                }
226

227
                // Batch load shared data for this page.
NEW
228
                batchData, err := batchDataFunc(ctx, pageIDs)
×
NEW
229
                if err != nil {
×
NEW
230
                        return fmt.Errorf("failed to load batch data for "+
×
NEW
231
                                "page: %w", err)
×
NEW
232
                }
×
233

234
                // Step 3: Process each item in this page with the shared batch
235
                // data.
NEW
236
                for _, item := range items {
×
NEW
237
                        err := processItem(ctx, item, batchData)
×
NEW
238
                        if err != nil {
×
NEW
239
                                return fmt.Errorf("failed to process item "+
×
NEW
240
                                        "with batch data: %w", err)
×
NEW
241
                        }
×
242

243
                        // Update cursor for next page.
NEW
244
                        cursor = extractPageCursor(item)
×
245
                }
246

247
                // If the number of items is less than the max page size,
248
                // we assume there are no more items to fetch.
NEW
249
                if len(items) < int(cfg.MaxPageSize) {
×
NEW
250
                        break
×
251
                }
252
        }
253

NEW
254
        return nil
×
255
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc