• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16501468389

24 Jul 2025 03:42PM UTC coverage: 67.2% (-0.03%) from 67.227%
16501468389

Pull #10090

github

web-flow
Merge 27f90d17a into 6f09d9653
Pull Request #10090: update golang dependencies

135426 of 201527 relevant lines covered (67.2%)

21680.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.37
/sqldb/interfaces.go
1
package sqldb
2

3
import (
4
        "context"
5
        "database/sql"
6
        "fmt"
7
        "math"
8
        "math/rand"
9
        prand "math/rand"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
13
)
14

15
var (
16
        // DefaultStoreTimeout is the default timeout used for any interaction
17
        // with the storage/database.
18
        DefaultStoreTimeout = time.Second * 10
19
)
20

21
const (
22
        // DefaultNumTxRetries is the default number of times we'll retry a
23
        // transaction if it fails with an error that permits transaction
24
        // repetition.
25
        DefaultNumTxRetries = 20
26

27
        // DefaultRetryDelay is the default delay between retries. This will be
28
        // used to generate a random delay between 0 and this value.
29
        DefaultRetryDelay = time.Millisecond * 50
30

31
        // DefaultMaxRetryDelay is the default maximum delay between retries.
32
        DefaultMaxRetryDelay = time.Second
33
)
34

35
// TxOptions represents a set of options one can use to control what type of
36
// database transaction is created. Transaction can be either read or write.
37
type TxOptions interface {
38
        // ReadOnly returns true if the transaction should be read only.
39
        ReadOnly() bool
40
}
41

42
// txOptions is a concrete implementation of the TxOptions interface.
43
type txOptions struct {
44
        // readOnly indicates if the transaction should be read-only.
45
        readOnly bool
46
}
47

48
// ReadOnly returns true if the transaction should be read only.
49
//
50
// NOTE: This is part of the TxOptions interface.
51
func (t *txOptions) ReadOnly() bool {
37,530✔
52
        return t.readOnly
37,530✔
53
}
37,530✔
54

55
// WriteTxOpt returns a TxOptions that indicates that the transaction
56
// should be a write transaction.
57
func WriteTxOpt() TxOptions {
10,491✔
58
        return &txOptions{
10,491✔
59
                readOnly: false,
10,491✔
60
        }
10,491✔
61
}
10,491✔
62

63
// ReadTxOpt returns a TxOptions that indicates that the transaction
64
// should be a read-only transaction.
65
func ReadTxOpt() TxOptions {
26,656✔
66
        return &txOptions{
26,656✔
67
                readOnly: true,
26,656✔
68
        }
26,656✔
69
}
26,656✔
70

71
// BatchedTx is a generic interface that represents the ability to execute
72
// several operations to a given storage interface in a single atomic
73
// transaction. Typically, Q here will be some subset of the main sqlc.Querier
74
// interface allowing it to only depend on the routines it needs to implement
75
// any additional business logic.
76
type BatchedTx[Q any] interface {
77
        // ExecTx will execute the passed txBody, operating upon generic
78
        // parameter Q (usually a storage interface) in a single transaction.
79
        //
80
        // The set of TxOptions are passed in order to allow the caller to
81
        // specify if a transaction should be read-only and optionally what
82
        // type of concurrency control should be used.
83
        ExecTx(ctx context.Context, txOptions TxOptions,
84
                txBody func(Q) error, reset func()) error
85
}
86

87
// Tx represents a database transaction that can be committed or rolled back.
88
type Tx interface {
89
        // Commit commits the database transaction, an error should be returned
90
        // if the commit isn't possible.
91
        Commit() error
92

93
        // Rollback rolls back an incomplete database transaction.
94
        // Transactions that were able to be committed can still call this as a
95
        // noop.
96
        Rollback() error
97
}
98

99
// QueryCreator is a generic function that's used to create a Querier, which is
100
// a type of interface that implements storage related methods from a database
101
// transaction. This will be used to instantiate an object callers can use to
102
// apply multiple modifications to an object interface in a single atomic
103
// transaction.
104
type QueryCreator[Q any] func(*sql.Tx) Q
105

106
// BatchedQuerier is a generic interface that allows callers to create a new
107
// database transaction based on an abstract type that implements the TxOptions
108
// interface.
109
type BatchedQuerier interface {
110
        // Querier is the underlying query source, this is in place so we can
111
        // pass a BatchedQuerier implementation directly into objects that
112
        // create a batched version of the normal methods they need.
113
        sqlc.Querier
114

115
        // BeginTx creates a new database transaction given the set of
116
        // transaction options.
117
        BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error)
118
}
119

120
// txExecutorOptions is a struct that holds the options for the transaction
121
// executor. This can be used to do things like retry a transaction due to an
122
// error a certain amount of times.
123
type txExecutorOptions struct {
124
        numRetries int
125
        retryDelay time.Duration
126
}
127

128
// defaultTxExecutorOptions returns the default options for the transaction
129
// executor.
130
func defaultTxExecutorOptions() *txExecutorOptions {
1,058✔
131
        return &txExecutorOptions{
1,058✔
132
                numRetries: DefaultNumTxRetries,
1,058✔
133
                retryDelay: DefaultRetryDelay,
1,058✔
134
        }
1,058✔
135
}
1,058✔
136

137
// randRetryDelay returns a random retry delay between 0 and the configured max
138
// delay.
139
func (t *txExecutorOptions) randRetryDelay() time.Duration {
×
140
        return time.Duration(prand.Int63n(int64(t.retryDelay))) //nolint:gosec
×
141
}
×
142

143
// TxExecutorOption is a functional option that allows us to pass in optional
144
// argument when creating the executor.
145
type TxExecutorOption func(*txExecutorOptions)
146

147
// WithTxRetries is a functional option that allows us to specify the number of
148
// times a transaction should be retried if it fails with a repeatable error.
149
func WithTxRetries(numRetries int) TxExecutorOption {
×
150
        return func(o *txExecutorOptions) {
×
151
                o.numRetries = numRetries
×
152
        }
×
153
}
154

155
// WithTxRetryDelay is a functional option that allows us to specify the delay
156
// to wait before a transaction is retried.
157
func WithTxRetryDelay(delay time.Duration) TxExecutorOption {
×
158
        return func(o *txExecutorOptions) {
×
159
                o.retryDelay = delay
×
160
        }
×
161
}
162

163
// TransactionExecutor is a generic struct that abstracts away from the type of
164
// query a type needs to run under a database transaction, and also the set of
165
// options for that transaction. The QueryCreator is used to create a query
166
// given a database transaction created by the BatchedQuerier.
167
type TransactionExecutor[Query any] struct {
168
        BatchedQuerier
169

170
        createQuery QueryCreator[Query]
171

172
        opts *txExecutorOptions
173
}
174

175
// NewTransactionExecutor creates a new instance of a TransactionExecutor given
176
// a Querier query object and a concrete type for the type of transactions the
177
// Querier understands.
178
func NewTransactionExecutor[Querier any](db BatchedQuerier,
179
        createQuery QueryCreator[Querier],
180
        opts ...TxExecutorOption) *TransactionExecutor[Querier] {
1,058✔
181

1,058✔
182
        txOpts := defaultTxExecutorOptions()
1,058✔
183
        for _, optFunc := range opts {
1,058✔
184
                optFunc(txOpts)
×
185
        }
×
186

187
        return &TransactionExecutor[Querier]{
1,058✔
188
                BatchedQuerier: db,
1,058✔
189
                createQuery:    createQuery,
1,058✔
190
                opts:           txOpts,
1,058✔
191
        }
1,058✔
192
}
193

194
// randRetryDelay returns a random retry delay between -50% and +50% of the
195
// configured delay that is doubled for each attempt and capped at a max value.
196
func randRetryDelay(initialRetryDelay, maxRetryDelay time.Duration,
197
        attempt int) time.Duration {
×
198

×
199
        halfDelay := initialRetryDelay / 2
×
200
        randDelay := rand.Int63n(int64(initialRetryDelay)) //nolint:gosec
×
201

×
202
        // 50% plus 0%-100% gives us the range of 50%-150%.
×
203
        initialDelay := halfDelay + time.Duration(randDelay)
×
204

×
205
        // If this is the first attempt, we just return the initial delay.
×
206
        if attempt == 0 {
×
207
                return initialDelay
×
208
        }
×
209

210
        // For each subsequent delay, we double the initial delay. This still
211
        // gives us a somewhat random delay, but it still increases with each
212
        // attempt. If we double something n times, that's the same as
213
        // multiplying the value with 2^n. We limit the power to 32 to avoid
214
        // overflows.
215
        factor := time.Duration(math.Pow(2, min(float64(attempt), 32)))
×
216
        actualDelay := initialDelay * factor
×
217

×
218
        // Cap the delay at the maximum configured value.
×
219
        if actualDelay > maxRetryDelay {
×
220
                return maxRetryDelay
×
221
        }
×
222

223
        return actualDelay
×
224
}
225

226
// MakeTx is a function that creates a new transaction. It returns a Tx and an
227
// error if the transaction cannot be created. This is used to abstract the
228
// creation of a transaction from the actual transaction logic in order to be
229
// able to reuse the transaction retry logic in other packages.
230
type MakeTx func() (Tx, error)
231

232
// TxBody represents the function type for transactions. It returns an
233
// error to indicate success or failure.
234
type TxBody func(tx Tx) error
235

236
// RollbackTx is a function that is called when a transaction needs to be rolled
237
// back due to a serialization error. By using this intermediate function, we
238
// can avoid having to return rollback errors that are not actionable by the
239
// caller.
240
type RollbackTx func(tx Tx) error
241

242
// OnBackoff is a function that is called when a transaction is retried due to a
243
// serialization error. The function is called with the retry attempt number and
244
// the delay before the next retry.
245
type OnBackoff func(retry int, delay time.Duration)
246

247
// ExecuteSQLTransactionWithRetry is a helper function that executes a
248
// transaction with retry logic. It will retry the transaction if it fails with
249
// a serialization error. The function will return an error if the transaction
250
// fails with a non-retryable error, the context is cancelled or the number of
251
// retries is exceeded.
252
func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx,
253
        rollbackTx RollbackTx, txBody TxBody, onBackoff OnBackoff,
254
        numRetries int) error {
26,685✔
255

26,685✔
256
        waitBeforeRetry := func(attemptNumber int) bool {
26,685✔
257
                retryDelay := randRetryDelay(
×
258
                        DefaultRetryDelay, DefaultMaxRetryDelay, attemptNumber,
×
259
                )
×
260

×
261
                onBackoff(attemptNumber, retryDelay)
×
262

×
263
                select {
×
264
                // Before we try again, we'll wait with a random backoff based
265
                // on the retry delay.
266
                case <-time.After(retryDelay):
×
267
                        return true
×
268

269
                // If the daemon is shutting down, then we'll exit early.
270
                case <-ctx.Done():
×
271
                        return false
×
272
                }
273
        }
274

275
        for i := 0; i < numRetries; i++ {
53,370✔
276
                tx, err := makeTx()
26,685✔
277
                if err != nil {
26,685✔
278
                        dbErr := MapSQLError(err)
×
279
                        log.Tracef("Failed to makeTx: err=%v, dbErr=%v", err,
×
280
                                dbErr)
×
281

×
282
                        if IsSerializationError(dbErr) {
×
283
                                // Nothing to roll back here, since we haven't
×
284
                                // even get a transaction yet. We'll just wait
×
285
                                // and try again.
×
286
                                if waitBeforeRetry(i) {
×
287
                                        continue
×
288
                                }
289
                        }
290

291
                        return dbErr
×
292
                }
293

294
                // Rollback is safe to call even if the tx is already closed,
295
                // so if the tx commits successfully, this is a no-op.
296
                defer func() {
53,370✔
297
                        _ = tx.Rollback()
26,685✔
298
                }()
26,685✔
299

300
                if bodyErr := txBody(tx); bodyErr != nil {
26,785✔
301
                        log.Tracef("Error in txBody: %v", bodyErr)
100✔
302

100✔
303
                        // Roll back the transaction, then attempt a random
100✔
304
                        // backoff and try again if the error was a
100✔
305
                        // serialization error.
100✔
306
                        if err := rollbackTx(tx); err != nil {
100✔
307
                                return MapSQLError(err)
×
308
                        }
×
309

310
                        dbErr := MapSQLError(bodyErr)
100✔
311
                        if IsSerializationError(dbErr) {
100✔
312
                                if waitBeforeRetry(i) {
×
313
                                        continue
×
314
                                }
315
                        }
316

317
                        return dbErr
100✔
318
                }
319

320
                // Commit transaction.
321
                if commitErr := tx.Commit(); commitErr != nil {
26,585✔
322
                        log.Tracef("Failed to commit tx: %v", commitErr)
×
323

×
324
                        // Roll back the transaction, then attempt a random
×
325
                        // backoff and try again if the error was a
×
326
                        // serialization error.
×
327
                        if err := rollbackTx(tx); err != nil {
×
328
                                return MapSQLError(err)
×
329
                        }
×
330

331
                        dbErr := MapSQLError(commitErr)
×
332
                        if IsSerializationError(dbErr) {
×
333
                                if waitBeforeRetry(i) {
×
334
                                        continue
×
335
                                }
336
                        }
337

338
                        return dbErr
×
339
                }
340

341
                return nil
26,585✔
342
        }
343

344
        // If we get to this point, then we weren't able to successfully commit
345
        // a tx given the max number of retries.
346
        return ErrRetriesExceeded
×
347
}
348

349
// ExecTx is a wrapper for txBody to abstract the creation and commit of a db
350
// transaction. The db transaction is embedded in a `*Queries` that txBody
351
// needs to use when executing each one of the queries that need to be applied
352
// atomically. This can be used by other storage interfaces to parameterize the
353
// type of query and options run, in order to have access to batched operations
354
// related to a storage object.
355
func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
356
        txOptions TxOptions, txBody func(Q) error, reset func()) error {
26,685✔
357

26,685✔
358
        makeTx := func() (Tx, error) {
53,370✔
359
                return t.BatchedQuerier.BeginTx(ctx, txOptions)
26,685✔
360
        }
26,685✔
361

362
        execTxBody := func(tx Tx) error {
53,370✔
363
                sqlTx, ok := tx.(*sql.Tx)
26,685✔
364
                if !ok {
26,685✔
365
                        return fmt.Errorf("expected *sql.Tx, got %T", tx)
×
366
                }
×
367

368
                reset()
26,685✔
369
                return txBody(t.createQuery(sqlTx))
26,685✔
370
        }
371

372
        onBackoff := func(retry int, delay time.Duration) {
26,685✔
373
                log.Tracef("Retrying transaction due to tx serialization "+
×
374
                        "error, attempt_number=%v, delay=%v", retry, delay)
×
375
        }
×
376

377
        rollbackTx := func(tx Tx) error {
26,785✔
378
                sqlTx, ok := tx.(*sql.Tx)
100✔
379
                if !ok {
100✔
380
                        return fmt.Errorf("expected *sql.Tx, got %T", tx)
×
381
                }
×
382

383
                _ = sqlTx.Rollback()
100✔
384

100✔
385
                return nil
100✔
386
        }
387

388
        return ExecuteSQLTransactionWithRetry(
26,685✔
389
                ctx, makeTx, rollbackTx, execTxBody, onBackoff,
26,685✔
390
                t.opts.numRetries,
26,685✔
391
        )
26,685✔
392
}
393

394
// DB is an interface that represents a generic SQL database. It provides
395
// methods to apply migrations and access the underlying database connection.
396
type DB interface {
397
        // GetBaseDB returns the underlying BaseDB instance.
398
        GetBaseDB() *BaseDB
399

400
        // ApplyAllMigrations applies all migrations to the database including
401
        // both sqlc and custom in-code migrations.
402
        ApplyAllMigrations(ctx context.Context,
403
                customMigrations []MigrationConfig) error
404
}
405

406
// BaseDB is the base database struct that each implementation can embed to
407
// gain some common functionality.
408
type BaseDB struct {
409
        *sql.DB
410

411
        *sqlc.Queries
412
}
413

414
// BeginTx wraps the normal sql specific BeginTx method with the TxOptions
415
// interface. This interface is then mapped to the concrete sql tx options
416
// struct.
417
func (s *BaseDB) BeginTx(ctx context.Context, opts TxOptions) (*sql.Tx, error) {
26,685✔
418
        sqlOptions := sql.TxOptions{
26,685✔
419
                Isolation: sql.LevelSerializable,
26,685✔
420
                ReadOnly:  opts.ReadOnly(),
26,685✔
421
        }
26,685✔
422

26,685✔
423
        return s.DB.BeginTx(ctx, &sqlOptions)
26,685✔
424
}
26,685✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc