• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13975285956

20 Mar 2025 05:12PM UTC coverage: 68.667%. First build
13975285956

Pull #9609

github

web-flow
Merge c842aa78c into bcc80e7f9
Pull Request #9609: Fix inaccurate `listunspent` result

29 of 38 new or added lines in 8 files covered. (76.32%)

130412 of 189920 relevant lines covered (68.67%)

23529.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.49
/sqldb/interfaces.go
1
package sqldb
2

3
import (
4
        "context"
5
        "database/sql"
6
        "fmt"
7
        "math"
8
        "math/rand"
9
        prand "math/rand"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
13
)
14

15
var (
16
        // DefaultStoreTimeout is the default timeout used for any interaction
17
        // with the storage/database.
18
        DefaultStoreTimeout = time.Second * 10
19
)
20

21
const (
22
        // DefaultNumTxRetries is the default number of times we'll retry a
23
        // transaction if it fails with an error that permits transaction
24
        // repetition.
25
        DefaultNumTxRetries = 20
26

27
        // DefaultRetryDelay is the default delay between retries. This will be
28
        // used to generate a random delay between 0 and this value.
29
        DefaultRetryDelay = time.Millisecond * 50
30

31
        // DefaultMaxRetryDelay is the default maximum delay between retries.
32
        DefaultMaxRetryDelay = time.Second
33
)
34

35
// TxOptions represents a set of options one can use to control what type of
36
// database transaction is created. Transaction can be either read or write.
37
type TxOptions interface {
38
        // ReadOnly returns true if the transaction should be read only.
39
        ReadOnly() bool
40
}
41

42
// BatchedTx is a generic interface that represents the ability to execute
43
// several operations to a given storage interface in a single atomic
44
// transaction. Typically, Q here will be some subset of the main sqlc.Querier
45
// interface allowing it to only depend on the routines it needs to implement
46
// any additional business logic.
47
type BatchedTx[Q any] interface {
48
        // ExecTx will execute the passed txBody, operating upon generic
49
        // parameter Q (usually a storage interface) in a single transaction.
50
        //
51
        // The set of TxOptions are passed in order to allow the caller to
52
        // specify if a transaction should be read-only and optionally what
53
        // type of concurrency control should be used.
54
        ExecTx(ctx context.Context, txOptions TxOptions,
55
                txBody func(Q) error, reset func()) error
56
}
57

58
// Tx represents a database transaction that can be committed or rolled back.
59
type Tx interface {
60
        // Commit commits the database transaction, an error should be returned
61
        // if the commit isn't possible.
62
        Commit() error
63

64
        // Rollback rolls back an incomplete database transaction.
65
        // Transactions that were able to be committed can still call this as a
66
        // noop.
67
        Rollback() error
68
}
69

70
// QueryCreator is a generic function that's used to create a Querier, which is
71
// a type of interface that implements storage related methods from a database
72
// transaction. This will be used to instantiate an object callers can use to
73
// apply multiple modifications to an object interface in a single atomic
74
// transaction.
75
type QueryCreator[Q any] func(*sql.Tx) Q
76

77
// BatchedQuerier is a generic interface that allows callers to create a new
78
// database transaction based on an abstract type that implements the TxOptions
79
// interface.
80
type BatchedQuerier interface {
81
        // Querier is the underlying query source, this is in place so we can
82
        // pass a BatchedQuerier implementation directly into objects that
83
        // create a batched version of the normal methods they need.
84
        sqlc.Querier
85

86
        // BeginTx creates a new database transaction given the set of
87
        // transaction options.
88
        BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error)
89
}
90

91
// txExecutorOptions is a struct that holds the options for the transaction
92
// executor. This can be used to do things like retry a transaction due to an
93
// error a certain amount of times.
94
type txExecutorOptions struct {
95
        numRetries int
96
        retryDelay time.Duration
97
}
98

99
// defaultTxExecutorOptions returns the default options for the transaction
100
// executor.
101
func defaultTxExecutorOptions() *txExecutorOptions {
1,046✔
102
        return &txExecutorOptions{
1,046✔
103
                numRetries: DefaultNumTxRetries,
1,046✔
104
                retryDelay: DefaultRetryDelay,
1,046✔
105
        }
1,046✔
106
}
1,046✔
107

108
// randRetryDelay returns a random retry delay between 0 and the configured max
109
// delay.
110
func (t *txExecutorOptions) randRetryDelay() time.Duration {
×
111
        return time.Duration(prand.Int63n(int64(t.retryDelay))) //nolint:gosec
×
112
}
×
113

114
// TxExecutorOption is a functional option that allows us to pass in optional
115
// argument when creating the executor.
116
type TxExecutorOption func(*txExecutorOptions)
117

118
// WithTxRetries is a functional option that allows us to specify the number of
119
// times a transaction should be retried if it fails with a repeatable error.
120
func WithTxRetries(numRetries int) TxExecutorOption {
×
121
        return func(o *txExecutorOptions) {
×
122
                o.numRetries = numRetries
×
123
        }
×
124
}
125

126
// WithTxRetryDelay is a functional option that allows us to specify the delay
127
// to wait before a transaction is retried.
128
func WithTxRetryDelay(delay time.Duration) TxExecutorOption {
×
129
        return func(o *txExecutorOptions) {
×
130
                o.retryDelay = delay
×
131
        }
×
132
}
133

134
// TransactionExecutor is a generic struct that abstracts away from the type of
135
// query a type needs to run under a database transaction, and also the set of
136
// options for that transaction. The QueryCreator is used to create a query
137
// given a database transaction created by the BatchedQuerier.
138
type TransactionExecutor[Query any] struct {
139
        BatchedQuerier
140

141
        createQuery QueryCreator[Query]
142

143
        opts *txExecutorOptions
144
}
145

146
// NewTransactionExecutor creates a new instance of a TransactionExecutor given
147
// a Querier query object and a concrete type for the type of transactions the
148
// Querier understands.
149
func NewTransactionExecutor[Querier any](db BatchedQuerier,
150
        createQuery QueryCreator[Querier],
151
        opts ...TxExecutorOption) *TransactionExecutor[Querier] {
1,046✔
152

1,046✔
153
        txOpts := defaultTxExecutorOptions()
1,046✔
154
        for _, optFunc := range opts {
1,046✔
155
                optFunc(txOpts)
×
156
        }
×
157

158
        return &TransactionExecutor[Querier]{
1,046✔
159
                BatchedQuerier: db,
1,046✔
160
                createQuery:    createQuery,
1,046✔
161
                opts:           txOpts,
1,046✔
162
        }
1,046✔
163
}
164

165
// randRetryDelay returns a random retry delay between -50% and +50% of the
166
// configured delay that is doubled for each attempt and capped at a max value.
167
func randRetryDelay(initialRetryDelay, maxRetryDelay time.Duration,
168
        attempt int) time.Duration {
1✔
169

1✔
170
        halfDelay := initialRetryDelay / 2
1✔
171
        randDelay := rand.Int63n(int64(initialRetryDelay)) //nolint:gosec
1✔
172

1✔
173
        // 50% plus 0%-100% gives us the range of 50%-150%.
1✔
174
        initialDelay := halfDelay + time.Duration(randDelay)
1✔
175

1✔
176
        // If this is the first attempt, we just return the initial delay.
1✔
177
        if attempt == 0 {
2✔
178
                return initialDelay
1✔
179
        }
1✔
180

181
        // For each subsequent delay, we double the initial delay. This still
182
        // gives us a somewhat random delay, but it still increases with each
183
        // attempt. If we double something n times, that's the same as
184
        // multiplying the value with 2^n. We limit the power to 32 to avoid
185
        // overflows.
186
        factor := time.Duration(math.Pow(2, min(float64(attempt), 32)))
×
187
        actualDelay := initialDelay * factor
×
188

×
189
        // Cap the delay at the maximum configured value.
×
190
        if actualDelay > maxRetryDelay {
×
191
                return maxRetryDelay
×
192
        }
×
193

194
        return actualDelay
×
195
}
196

197
// MakeTx is a function that creates a new transaction. It returns a Tx and an
198
// error if the transaction cannot be created. This is used to abstract the
199
// creation of a transaction from the actual transaction logic in order to be
200
// able to reuse the transaction retry logic in other packages.
201
type MakeTx func() (Tx, error)
202

203
// TxBody represents the function type for transactions. It returns an
204
// error to indicate success or failure.
205
type TxBody func(tx Tx) error
206

207
// RollbackTx is a function that is called when a transaction needs to be rolled
208
// back due to a serialization error. By using this intermediate function, we
209
// can avoid having to return rollback errors that are not actionable by the
210
// caller.
211
type RollbackTx func(tx Tx) error
212

213
// OnBackoff is a function that is called when a transaction is retried due to a
214
// serialization error. The function is called with the retry attempt number and
215
// the delay before the next retry.
216
type OnBackoff func(retry int, delay time.Duration)
217

218
// ExecuteSQLTransactionWithRetry is a helper function that executes a
219
// transaction with retry logic. It will retry the transaction if it fails with
220
// a serialization error. The function will return an error if the transaction
221
// fails with a non-retryable error, the context is cancelled or the number of
222
// retries is exceeded.
223
func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx,
224
        rollbackTx RollbackTx, txBody TxBody, onBackoff OnBackoff,
225
        numRetries int) error {
26,408✔
226

26,408✔
227
        waitBeforeRetry := func(attemptNumber int) bool {
26,409✔
228
                retryDelay := randRetryDelay(
1✔
229
                        DefaultRetryDelay, DefaultMaxRetryDelay, attemptNumber,
1✔
230
                )
1✔
231

1✔
232
                onBackoff(attemptNumber, retryDelay)
1✔
233

1✔
234
                select {
1✔
235
                // Before we try again, we'll wait with a random backoff based
236
                // on the retry delay.
237
                case <-time.After(retryDelay):
1✔
238
                        return true
1✔
239

240
                // If the daemon is shutting down, then we'll exit early.
241
                case <-ctx.Done():
×
242
                        return false
×
243
                }
244
        }
245

246
        for i := 0; i < numRetries; i++ {
52,817✔
247
                tx, err := makeTx()
26,409✔
248
                if err != nil {
26,409✔
249
                        dbErr := MapSQLError(err)
×
NEW
250
                        log.Tracef("Failed to makeTx: err=%v, dbErr=%v", err,
×
NEW
251
                                dbErr)
×
NEW
252

×
253
                        if IsSerializationError(dbErr) {
×
254
                                // Nothing to roll back here, since we haven't
×
255
                                // even get a transaction yet. We'll just wait
×
256
                                // and try again.
257
                                if waitBeforeRetry(i) {
258
                                        continue
259
                                }
×
260
                        }
261

262
                        return dbErr
263
                }
264

52,818✔
265
                // Rollback is safe to call even if the tx is already closed,
26,409✔
266
                // so if the tx commits successfully, this is a no-op.
26,409✔
267
                defer func() {
268
                        _ = tx.Rollback()
26,510✔
269
                }()
101✔
270

101✔
271
                if bodyErr := txBody(tx); bodyErr != nil {
101✔
272
                        log.Tracef("Error in txBody: %v", bodyErr)
101✔
NEW
273

×
274
                        // Roll back the transaction, then attempt a random
×
275
                        // backoff and try again if the error was a
276
                        // serialization error.
101✔
277
                        if err := rollbackTx(tx); err != nil {
102✔
278
                                return MapSQLError(err)
2✔
279
                        }
1✔
280

281
                        dbErr := MapSQLError(bodyErr)
282
                        if IsSerializationError(dbErr) {
283
                                if waitBeforeRetry(i) {
100✔
284
                                        continue
285
                                }
286
                        }
287

26,308✔
288
                        return dbErr
×
289
                }
×
290

×
291
                // Commit transaction.
×
292
                if commitErr := tx.Commit(); commitErr != nil {
×
NEW
293
                        log.Tracef("Failed to commit tx: %v", commitErr)
×
294

295
                        // Roll back the transaction, then attempt a random
×
296
                        // backoff and try again if the error was a
×
297
                        // serialization error.
×
298
                        if err := rollbackTx(tx); err != nil {
×
299
                                return MapSQLError(err)
300
                        }
301

302
                        dbErr := MapSQLError(commitErr)
×
303
                        if IsSerializationError(dbErr) {
304
                                if waitBeforeRetry(i) {
305
                                        continue
26,308✔
306
                                }
307
                        }
308

309
                        return dbErr
310
                }
×
311

312
                return nil
313
        }
314

315
        // If we get to this point, then we weren't able to successfully commit
316
        // a tx given the max number of retries.
317
        return ErrRetriesExceeded
318
}
319

320
// ExecTx is a wrapper for txBody to abstract the creation and commit of a db
26,408✔
321
// transaction. The db transaction is embedded in a `*Queries` that txBody
26,408✔
322
// needs to use when executing each one of the queries that need to be applied
52,817✔
323
// atomically. This can be used by other storage interfaces to parameterize the
26,409✔
324
// type of query and options run, in order to have access to batched operations
26,409✔
325
// related to a storage object.
326
func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
52,817✔
327
        txOptions TxOptions, txBody func(Q) error, reset func()) error {
26,409✔
328

26,409✔
329
        makeTx := func() (Tx, error) {
×
330
                return t.BatchedQuerier.BeginTx(ctx, txOptions)
×
331
        }
332

26,409✔
333
        execTxBody := func(tx Tx) error {
26,409✔
334
                sqlTx, ok := tx.(*sql.Tx)
335
                if !ok {
336
                        return fmt.Errorf("expected *sql.Tx, got %T", tx)
26,409✔
337
                }
1✔
338

1✔
339
                reset()
1✔
340
                return txBody(t.createQuery(sqlTx))
341
        }
26,509✔
342

101✔
343
        onBackoff := func(retry int, delay time.Duration) {
101✔
344
                log.Tracef("Retrying transaction due to tx serialization "+
×
345
                        "error, attempt_number=%v, delay=%v", retry, delay)
×
346
        }
347

101✔
348
        rollbackTx := func(tx Tx) error {
101✔
349
                sqlTx, ok := tx.(*sql.Tx)
101✔
350
                if !ok {
351
                        return fmt.Errorf("expected *sql.Tx, got %T", tx)
352
                }
26,408✔
353

26,408✔
354
                _ = sqlTx.Rollback()
26,408✔
355

26,408✔
356
                return nil
357
        }
358

359
        return ExecuteSQLTransactionWithRetry(
360
                ctx, makeTx, rollbackTx, execTxBody, onBackoff,
361
                t.opts.numRetries,
362
        )
363
}
364

365
// DB is an interface that represents a generic SQL database. It provides
366
// methods to apply migrations and access the underlying database connection.
367
type DB interface {
368
        // GetBaseDB returns the underlying BaseDB instance.
369
        GetBaseDB() *BaseDB
370

371
        // ApplyAllMigrations applies all migrations to the database including
372
        // both sqlc and custom in-code migrations.
373
        ApplyAllMigrations(ctx context.Context,
374
                customMigrations []MigrationConfig) error
375
}
376

377
// BaseDB is the base database struct that each implementation can embed to
378
// gain some common functionality.
379
type BaseDB struct {
380
        *sql.DB
381

26,409✔
382
        *sqlc.Queries
26,409✔
383
}
26,409✔
384

26,409✔
385
// BeginTx wraps the normal sql specific BeginTx method with the TxOptions
26,409✔
386
// interface. This interface is then mapped to the concrete sql tx options
26,409✔
387
// struct.
26,409✔
388
func (s *BaseDB) BeginTx(ctx context.Context, opts TxOptions) (*sql.Tx, error) {
26,409✔
389
        sqlOptions := sql.TxOptions{
390
                Isolation: sql.LevelSerializable,
391
                ReadOnly:  opts.ReadOnly(),
392
        }
393

394
        return s.DB.BeginTx(ctx, &sqlOptions)
395
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc