• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12980140662

27 Jan 2025 01:05AM UTC coverage: 58.791% (+0.02%) from 58.774%
12980140662

Pull #9451

github

Juneezee
docs: add release note for #9451

Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
Pull Request #9451: refactor: replace min/max helpers with built-in min/max

11 of 12 new or added lines in 6 files covered. (91.67%)

23 existing lines in 9 files now uncovered.

136032 of 231381 relevant lines covered (58.79%)

19302.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.65
/sqldb/interfaces.go
1
package sqldb
2

3
import (
4
        "context"
5
        "database/sql"
6
        "fmt"
7
        "math"
8
        "math/rand"
9
        prand "math/rand"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
13
)
14

15
var (
16
        // DefaultStoreTimeout is the default timeout used for any interaction
17
        // with the storage/database.
18
        DefaultStoreTimeout = time.Second * 10
19
)
20

21
const (
22
        // DefaultNumTxRetries is the default number of times we'll retry a
23
        // transaction if it fails with an error that permits transaction
24
        // repetition.
25
        DefaultNumTxRetries = 20
26

27
        // DefaultRetryDelay is the default delay between retries. This will be
28
        // used to generate a random delay between 0 and this value.
29
        DefaultRetryDelay = time.Millisecond * 50
30

31
        // DefaultMaxRetryDelay is the default maximum delay between retries.
32
        DefaultMaxRetryDelay = time.Second
33
)
34

35
// TxOptions represents a set of options one can use to control what type of
36
// database transaction is created. Transaction can be either read or write.
37
type TxOptions interface {
38
        // ReadOnly returns true if the transaction should be read only.
39
        ReadOnly() bool
40
}
41

42
// BatchedTx is a generic interface that represents the ability to execute
43
// several operations to a given storage interface in a single atomic
44
// transaction. Typically, Q here will be some subset of the main sqlc.Querier
45
// interface allowing it to only depend on the routines it needs to implement
46
// any additional business logic.
47
type BatchedTx[Q any] interface {
48
        // ExecTx will execute the passed txBody, operating upon generic
49
        // parameter Q (usually a storage interface) in a single transaction.
50
        //
51
        // The set of TxOptions are passed in order to allow the caller to
52
        // specify if a transaction should be read-only and optionally what
53
        // type of concurrency control should be used.
54
        ExecTx(ctx context.Context, txOptions TxOptions,
55
                txBody func(Q) error, reset func()) error
56
}
57

58
// Tx represents a database transaction that can be committed or rolled back.
59
type Tx interface {
60
        // Commit commits the database transaction, an error should be returned
61
        // if the commit isn't possible.
62
        Commit() error
63

64
        // Rollback rolls back an incomplete database transaction.
65
        // Transactions that were able to be committed can still call this as a
66
        // noop.
67
        Rollback() error
68
}
69

70
// QueryCreator is a generic function that's used to create a Querier, which is
71
// a type of interface that implements storage related methods from a database
72
// transaction. This will be used to instantiate an object callers can use to
73
// apply multiple modifications to an object interface in a single atomic
74
// transaction.
75
type QueryCreator[Q any] func(*sql.Tx) Q
76

77
// BatchedQuerier is a generic interface that allows callers to create a new
78
// database transaction based on an abstract type that implements the TxOptions
79
// interface.
80
type BatchedQuerier interface {
81
        // Querier is the underlying query source, this is in place so we can
82
        // pass a BatchedQuerier implementation directly into objects that
83
        // create a batched version of the normal methods they need.
84
        sqlc.Querier
85

86
        // BeginTx creates a new database transaction given the set of
87
        // transaction options.
88
        BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error)
89
}
90

91
// txExecutorOptions is a struct that holds the options for the transaction
92
// executor. This can be used to do things like retry a transaction due to an
93
// error a certain amount of times.
94
type txExecutorOptions struct {
95
        numRetries int
96
        retryDelay time.Duration
97
}
98

99
// defaultTxExecutorOptions returns the default options for the transaction
100
// executor.
101
func defaultTxExecutorOptions() *txExecutorOptions {
1,034✔
102
        return &txExecutorOptions{
1,034✔
103
                numRetries: DefaultNumTxRetries,
1,034✔
104
                retryDelay: DefaultRetryDelay,
1,034✔
105
        }
1,034✔
106
}
1,034✔
107

108
// randRetryDelay returns a random retry delay between 0 and the configured max
109
// delay.
110
func (t *txExecutorOptions) randRetryDelay() time.Duration {
×
111
        return time.Duration(prand.Int63n(int64(t.retryDelay))) //nolint:gosec
×
112
}
×
113

114
// TxExecutorOption is a functional option that allows us to pass in optional
115
// argument when creating the executor.
116
type TxExecutorOption func(*txExecutorOptions)
117

118
// WithTxRetries is a functional option that allows us to specify the number of
119
// times a transaction should be retried if it fails with a repeatable error.
120
func WithTxRetries(numRetries int) TxExecutorOption {
×
121
        return func(o *txExecutorOptions) {
×
122
                o.numRetries = numRetries
×
123
        }
×
124
}
125

126
// WithTxRetryDelay is a functional option that allows us to specify the delay
127
// to wait before a transaction is retried.
128
func WithTxRetryDelay(delay time.Duration) TxExecutorOption {
×
129
        return func(o *txExecutorOptions) {
×
130
                o.retryDelay = delay
×
131
        }
×
132
}
133

134
// TransactionExecutor is a generic struct that abstracts away from the type of
135
// query a type needs to run under a database transaction, and also the set of
136
// options for that transaction. The QueryCreator is used to create a query
137
// given a database transaction created by the BatchedQuerier.
138
type TransactionExecutor[Query any] struct {
139
        BatchedQuerier
140

141
        createQuery QueryCreator[Query]
142

143
        opts *txExecutorOptions
144
}
145

146
// NewTransactionExecutor creates a new instance of a TransactionExecutor given
147
// a Querier query object and a concrete type for the type of transactions the
148
// Querier understands.
149
func NewTransactionExecutor[Querier any](db BatchedQuerier,
150
        createQuery QueryCreator[Querier],
151
        opts ...TxExecutorOption) *TransactionExecutor[Querier] {
1,034✔
152

1,034✔
153
        txOpts := defaultTxExecutorOptions()
1,034✔
154
        for _, optFunc := range opts {
1,034✔
155
                optFunc(txOpts)
×
156
        }
×
157

158
        return &TransactionExecutor[Querier]{
1,034✔
159
                BatchedQuerier: db,
1,034✔
160
                createQuery:    createQuery,
1,034✔
161
                opts:           txOpts,
1,034✔
162
        }
1,034✔
163
}
164

165
// randRetryDelay returns a random retry delay between -50% and +50% of the
166
// configured delay that is doubled for each attempt and capped at a max value.
167
func randRetryDelay(initialRetryDelay, maxRetryDelay time.Duration,
168
        attempt int) time.Duration {
×
169

×
170
        halfDelay := initialRetryDelay / 2
×
171
        randDelay := rand.Int63n(int64(initialRetryDelay)) //nolint:gosec
×
172

×
173
        // 50% plus 0%-100% gives us the range of 50%-150%.
×
174
        initialDelay := halfDelay + time.Duration(randDelay)
×
175

×
176
        // If this is the first attempt, we just return the initial delay.
×
177
        if attempt == 0 {
×
178
                return initialDelay
×
179
        }
×
180

181
        // For each subsequent delay, we double the initial delay. This still
182
        // gives us a somewhat random delay, but it still increases with each
183
        // attempt. If we double something n times, that's the same as
184
        // multiplying the value with 2^n. We limit the power to 32 to avoid
185
        // overflows.
NEW
186
        factor := time.Duration(math.Pow(2, math.Min(float64(attempt), 32)))
×
187
        actualDelay := initialDelay * factor
×
188

×
189
        // Cap the delay at the maximum configured value.
×
190
        if actualDelay > maxRetryDelay {
×
191
                return maxRetryDelay
×
192
        }
×
193

194
        return actualDelay
×
195
}
196

197
// MakeTx is a function that creates a new transaction. It returns a Tx and an
198
// error if the transaction cannot be created. This is used to abstract the
199
// creation of a transaction from the actual transaction logic in order to be
200
// able to reuse the transaction retry logic in other packages.
201
type MakeTx func() (Tx, error)
202

203
// TxBody represents the function type for transactions. It returns an
204
// error to indicate success or failure.
205
type TxBody func(tx Tx) error
206

207
// RollbackTx is a function that is called when a transaction needs to be rolled
208
// back due to a serialization error. By using this intermediate function, we
209
// can avoid having to return rollback errors that are not actionable by the
210
// caller.
211
type RollbackTx func(tx Tx) error
212

213
// OnBackoff is a function that is called when a transaction is retried due to a
214
// serialization error. The function is called with the retry attempt number and
215
// the delay before the next retry.
216
type OnBackoff func(retry int, delay time.Duration)
217

218
// ExecuteSQLTransactionWithRetry is a helper function that executes a
219
// transaction with retry logic. It will retry the transaction if it fails with
220
// a serialization error. The function will return an error if the transaction
221
// fails with a non-retryable error, the context is cancelled or the number of
222
// retries is exceeded.
223
func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx,
224
        rollbackTx RollbackTx, txBody TxBody, onBackoff OnBackoff,
225
        numRetries int) error {
26,288✔
226

26,288✔
227
        waitBeforeRetry := func(attemptNumber int) bool {
26,288✔
228
                retryDelay := randRetryDelay(
×
229
                        DefaultRetryDelay, DefaultMaxRetryDelay, attemptNumber,
×
230
                )
×
231

×
232
                onBackoff(attemptNumber, retryDelay)
×
233

×
234
                select {
×
235
                // Before we try again, we'll wait with a random backoff based
236
                // on the retry delay.
237
                case <-time.After(retryDelay):
×
238
                        return true
×
239

240
                // If the daemon is shutting down, then we'll exit early.
241
                case <-ctx.Done():
×
242
                        return false
×
243
                }
244
        }
245

246
        for i := 0; i < numRetries; i++ {
52,576✔
247
                tx, err := makeTx()
26,288✔
248
                if err != nil {
26,288✔
249
                        dbErr := MapSQLError(err)
×
250
                        if IsSerializationError(dbErr) {
×
251
                                // Nothing to roll back here, since we haven't
×
252
                                // even get a transaction yet. We'll just wait
×
253
                                // and try again.
×
254
                                if waitBeforeRetry(i) {
×
255
                                        continue
×
256
                                }
257
                        }
258

259
                        return dbErr
×
260
                }
261

262
                // Rollback is safe to call even if the tx is already closed,
263
                // so if the tx commits successfully, this is a no-op.
264
                defer func() {
52,576✔
265
                        _ = tx.Rollback()
26,288✔
266
                }()
26,288✔
267

268
                if bodyErr := txBody(tx); bodyErr != nil {
26,388✔
269
                        // Roll back the transaction, then attempt a random
100✔
270
                        // backoff and try again if the error was a
100✔
271
                        // serialization error.
100✔
272
                        if err := rollbackTx(tx); err != nil {
100✔
273
                                return MapSQLError(err)
×
274
                        }
×
275

276
                        dbErr := MapSQLError(bodyErr)
100✔
277
                        if IsSerializationError(dbErr) {
100✔
278
                                if waitBeforeRetry(i) {
×
279
                                        continue
×
280
                                }
281
                        }
282

283
                        return dbErr
100✔
284
                }
285

286
                // Commit transaction.
287
                if commitErr := tx.Commit(); commitErr != nil {
26,188✔
288
                        // Roll back the transaction, then attempt a random
×
289
                        // backoff and try again if the error was a
×
290
                        // serialization error.
×
291
                        if err := rollbackTx(tx); err != nil {
×
292
                                return MapSQLError(err)
×
293
                        }
×
294

295
                        dbErr := MapSQLError(commitErr)
×
296
                        if IsSerializationError(dbErr) {
×
297
                                if waitBeforeRetry(i) {
×
298
                                        continue
×
299
                                }
300
                        }
301

302
                        return dbErr
×
303
                }
304

305
                return nil
26,188✔
306
        }
307

308
        // If we get to this point, then we weren't able to successfully commit
309
        // a tx given the max number of retries.
310
        return ErrRetriesExceeded
×
311
}
312

313
// ExecTx is a wrapper for txBody to abstract the creation and commit of a db
314
// transaction. The db transaction is embedded in a `*Queries` that txBody
315
// needs to use when executing each one of the queries that need to be applied
316
// atomically. This can be used by other storage interfaces to parameterize the
317
// type of query and options run, in order to have access to batched operations
318
// related to a storage object.
319
func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
320
        txOptions TxOptions, txBody func(Q) error, reset func()) error {
26,288✔
321

26,288✔
322
        makeTx := func() (Tx, error) {
52,576✔
323
                return t.BatchedQuerier.BeginTx(ctx, txOptions)
26,288✔
324
        }
26,288✔
325

326
        execTxBody := func(tx Tx) error {
52,576✔
327
                sqlTx, ok := tx.(*sql.Tx)
26,288✔
328
                if !ok {
26,288✔
329
                        return fmt.Errorf("expected *sql.Tx, got %T", tx)
×
330
                }
×
331

332
                reset()
26,288✔
333
                return txBody(t.createQuery(sqlTx))
26,288✔
334
        }
335

336
        onBackoff := func(retry int, delay time.Duration) {
26,288✔
337
                log.Tracef("Retrying transaction due to tx serialization "+
×
338
                        "error, attempt_number=%v, delay=%v", retry, delay)
×
339
        }
×
340

341
        rollbackTx := func(tx Tx) error {
26,388✔
342
                sqlTx, ok := tx.(*sql.Tx)
100✔
343
                if !ok {
100✔
344
                        return fmt.Errorf("expected *sql.Tx, got %T", tx)
×
345
                }
×
346

347
                _ = sqlTx.Rollback()
100✔
348

100✔
349
                return nil
100✔
350
        }
351

352
        return ExecuteSQLTransactionWithRetry(
26,288✔
353
                ctx, makeTx, rollbackTx, execTxBody, onBackoff,
26,288✔
354
                t.opts.numRetries,
26,288✔
355
        )
26,288✔
356
}
357

358
// DB is an interface that represents a generic SQL database. It provides
359
// methods to apply migrations and access the underlying database connection.
360
type DB interface {
361
        // GetBaseDB returns the underlying BaseDB instance.
362
        GetBaseDB() *BaseDB
363

364
        // ApplyAllMigrations applies all migrations to the database including
365
        // both sqlc and custom in-code migrations.
366
        ApplyAllMigrations(ctx context.Context,
367
                customMigrations []MigrationConfig) error
368
}
369

370
// BaseDB is the base database struct that each implementation can embed to
371
// gain some common functionality.
372
type BaseDB struct {
373
        *sql.DB
374

375
        *sqlc.Queries
376
}
377

378
// BeginTx wraps the normal sql specific BeginTx method with the TxOptions
379
// interface. This interface is then mapped to the concrete sql tx options
380
// struct.
381
func (s *BaseDB) BeginTx(ctx context.Context, opts TxOptions) (*sql.Tx, error) {
26,288✔
382
        sqlOptions := sql.TxOptions{
26,288✔
383
                Isolation: sql.LevelSerializable,
26,288✔
384
                ReadOnly:  opts.ReadOnly(),
26,288✔
385
        }
26,288✔
386

26,288✔
387
        return s.DB.BeginTx(ctx, &sqlOptions)
26,288✔
388
}
26,288✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc