• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17236841262

26 Aug 2025 11:33AM UTC coverage: 66.228% (+8.9%) from 57.321%
17236841262

Pull #9147

github

web-flow
Merge 39fde1801 into 0c2f045f5
Pull Request #9147: [Part 1|3] Introduce SQL Payment schema into LND

129 of 1847 new or added lines in 13 files covered. (6.98%)

20 existing lines in 7 files now uncovered.

136069 of 205456 relevant lines covered (66.23%)

21357.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.43
/sqldb/interfaces.go
1
package sqldb
2

3
import (
4
        "context"
5
        "database/sql"
6
        "fmt"
7
        "math"
8
        "math/rand"
9
        prand "math/rand"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
13
)
14

15
var (
16
        // DefaultStoreTimeout is the default timeout used for any interaction
17
        // with the storage/database.
18
        DefaultStoreTimeout = time.Second * 10
19
)
20

21
const (
22
        // DefaultNumTxRetries is the default number of times we'll retry a
23
        // transaction if it fails with an error that permits transaction
24
        // repetition.
25
        DefaultNumTxRetries = 20
26

27
        // DefaultRetryDelay is the default delay between retries. This will be
28
        // used to generate a random delay between 0 and this value.
29
        DefaultRetryDelay = time.Millisecond * 50
30

31
        // DefaultMaxRetryDelay is the default maximum delay between retries.
32
        DefaultMaxRetryDelay = time.Second
33
)
34

35
// TxOptions represents a set of options one can use to control what type of
36
// database transaction is created. Transaction can be either read or write.
37
type TxOptions interface {
38
        // ReadOnly returns true if the transaction should be read only.
39
        ReadOnly() bool
40
}
41

42
// txOptions is a concrete implementation of the TxOptions interface.
43
type txOptions struct {
44
        // readOnly indicates if the transaction should be read-only.
45
        readOnly bool
46
}
47

48
// ReadOnly returns true if the transaction should be read only.
49
//
50
// NOTE: This is part of the TxOptions interface.
51
func (t *txOptions) ReadOnly() bool {
38,182✔
52
        return t.readOnly
38,182✔
53
}
38,182✔
54

55
// WriteTxOpt returns a TxOptions that indicates that the transaction
56
// should be a write transaction.
57
func WriteTxOpt() TxOptions {
11,067✔
58
        return &txOptions{
11,067✔
59
                readOnly: false,
11,067✔
60
        }
11,067✔
61
}
11,067✔
62

63
// ReadTxOpt returns a TxOptions that indicates that the transaction
64
// should be a read-only transaction.
65
func ReadTxOpt() TxOptions {
26,795✔
66
        return &txOptions{
26,795✔
67
                readOnly: true,
26,795✔
68
        }
26,795✔
69
}
26,795✔
70

71
// BatchedTx is a generic interface that represents the ability to execute
72
// several operations to a given storage interface in a single atomic
73
// transaction. Typically, Q here will be some subset of the main sqlc.Querier
74
// interface allowing it to only depend on the routines it needs to implement
75
// any additional business logic.
76
type BatchedTx[Q any] interface {
77
        // ExecTx will execute the passed txBody, operating upon generic
78
        // parameter Q (usually a storage interface) in a single transaction.
79
        //
80
        // The set of TxOptions are passed in order to allow the caller to
81
        // specify if a transaction should be read-only and optionally what
82
        // type of concurrency control should be used.
83
        ExecTx(ctx context.Context, txOptions TxOptions,
84
                txBody func(Q) error, reset func()) error
85
}
86

87
// Tx represents a database transaction that can be committed or rolled back.
88
type Tx interface {
89
        // Commit commits the database transaction, an error should be returned
90
        // if the commit isn't possible.
91
        Commit() error
92

93
        // Rollback rolls back an incomplete database transaction.
94
        // Transactions that were able to be committed can still call this as a
95
        // noop.
96
        Rollback() error
97
}
98

99
// QueryCreator is a generic function that's used to create a Querier, which is
100
// a type of interface that implements storage related methods from a database
101
// transaction. This will be used to instantiate an object callers can use to
102
// apply multiple modifications to an object interface in a single atomic
103
// transaction.
104
type QueryCreator[Q any] func(*sql.Tx) Q
105

106
// BatchedQuerier is a generic interface that allows callers to create a new
107
// database transaction based on an abstract type that implements the TxOptions
108
// interface.
109
type BatchedQuerier interface {
110
        // Querier is the underlying query source, this is in place so we can
111
        // pass a BatchedQuerier implementation directly into objects that
112
        // create a batched version of the normal methods they need.
113
        sqlc.Querier
114

115
        // BeginTx creates a new database transaction given the set of
116
        // transaction options.
117
        BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error)
118
}
119

120
// txExecutorOptions is a struct that holds the options for the transaction
121
// executor. This can be used to do things like retry a transaction due to an
122
// error a certain amount of times.
123
type txExecutorOptions struct {
124
        numRetries int
125
        retryDelay time.Duration
126
}
127

128
// defaultTxExecutorOptions returns the default options for the transaction
129
// executor.
130
func defaultTxExecutorOptions() *txExecutorOptions {
1,058✔
131
        return &txExecutorOptions{
1,058✔
132
                numRetries: DefaultNumTxRetries,
1,058✔
133
                retryDelay: DefaultRetryDelay,
1,058✔
134
        }
1,058✔
135
}
1,058✔
136

137
// randRetryDelay returns a random retry delay between 0 and the configured max
138
// delay.
139
func (t *txExecutorOptions) randRetryDelay() time.Duration {
×
140
        return time.Duration(prand.Int63n(int64(t.retryDelay))) //nolint:gosec
×
141
}
×
142

143
// TxExecutorOption is a functional option that allows us to pass in optional
144
// argument when creating the executor.
145
type TxExecutorOption func(*txExecutorOptions)
146

147
// WithTxRetries is a functional option that allows us to specify the number of
148
// times a transaction should be retried if it fails with a repeatable error.
149
func WithTxRetries(numRetries int) TxExecutorOption {
×
150
        return func(o *txExecutorOptions) {
×
151
                o.numRetries = numRetries
×
152
        }
×
153
}
154

155
// WithTxRetryDelay is a functional option that allows us to specify the delay
156
// to wait before a transaction is retried.
157
func WithTxRetryDelay(delay time.Duration) TxExecutorOption {
×
158
        return func(o *txExecutorOptions) {
×
159
                o.retryDelay = delay
×
160
        }
×
161
}
162

163
// TransactionExecutor is a generic struct that abstracts away from the type of
164
// query a type needs to run under a database transaction, and also the set of
165
// options for that transaction. The QueryCreator is used to create a query
166
// given a database transaction created by the BatchedQuerier.
167
type TransactionExecutor[Query any] struct {
168
        BatchedQuerier
169

170
        createQuery QueryCreator[Query]
171

172
        opts *txExecutorOptions
173
}
174

175
// NewTransactionExecutor creates a new instance of a TransactionExecutor given
176
// a Querier query object and a concrete type for the type of transactions the
177
// Querier understands.
178
func NewTransactionExecutor[Querier any](db BatchedQuerier,
179
        createQuery QueryCreator[Querier],
180
        opts ...TxExecutorOption) *TransactionExecutor[Querier] {
1,058✔
181

1,058✔
182
        txOpts := defaultTxExecutorOptions()
1,058✔
183
        for _, optFunc := range opts {
1,058✔
184
                optFunc(txOpts)
×
185
        }
×
186

187
        return &TransactionExecutor[Querier]{
1,058✔
188
                BatchedQuerier: db,
1,058✔
189
                createQuery:    createQuery,
1,058✔
190
                opts:           txOpts,
1,058✔
191
        }
1,058✔
192
}
193

194
// randRetryDelay returns a random retry delay between -50% and +50% of the
195
// configured delay that is doubled for each attempt and capped at a max value.
196
func randRetryDelay(initialRetryDelay, maxRetryDelay time.Duration,
197
        attempt int) time.Duration {
1✔
198

1✔
199
        halfDelay := initialRetryDelay / 2
1✔
200
        randDelay := rand.Int63n(int64(initialRetryDelay)) //nolint:gosec
1✔
201

1✔
202
        // 50% plus 0%-100% gives us the range of 50%-150%.
1✔
203
        initialDelay := halfDelay + time.Duration(randDelay)
1✔
204

1✔
205
        // If this is the first attempt, we just return the initial delay.
1✔
206
        if attempt == 0 {
2✔
207
                return initialDelay
1✔
208
        }
1✔
209

210
        // For each subsequent delay, we double the initial delay. This still
211
        // gives us a somewhat random delay, but it still increases with each
212
        // attempt. If we double something n times, that's the same as
213
        // multiplying the value with 2^n. We limit the power to 32 to avoid
214
        // overflows.
215
        factor := time.Duration(math.Pow(2, min(float64(attempt), 32)))
×
216
        actualDelay := initialDelay * factor
×
217

×
218
        // Cap the delay at the maximum configured value.
×
219
        if actualDelay > maxRetryDelay {
×
220
                return maxRetryDelay
×
221
        }
×
222

223
        return actualDelay
×
224
}
225

226
// MakeTx is a function that creates a new transaction. It returns a Tx and an
227
// error if the transaction cannot be created. This is used to abstract the
228
// creation of a transaction from the actual transaction logic in order to be
229
// able to reuse the transaction retry logic in other packages.
230
type MakeTx func() (Tx, error)
231

232
// TxBody represents the function type for transactions. It returns an
233
// error to indicate success or failure.
234
type TxBody func(tx Tx) error
235

236
// RollbackTx is a function that is called when a transaction needs to be rolled
237
// back due to a serialization error. By using this intermediate function, we
238
// can avoid having to return rollback errors that are not actionable by the
239
// caller.
240
type RollbackTx func(tx Tx) error
241

242
// OnBackoff is a function that is called when a transaction is retried due to a
243
// serialization error. The function is called with the retry attempt number and
244
// the delay before the next retry.
245
type OnBackoff func(retry int, delay time.Duration)
246

247
// ExecuteSQLTransactionWithRetry is a helper function that executes a
248
// transaction with retry logic. It will retry the transaction if it fails with
249
// a serialization error. The function will return an error if the transaction
250
// fails with a non-retryable error, the context is cancelled or the number of
251
// retries is exceeded.
252
func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx,
253
        rollbackTx RollbackTx, txBody TxBody, onBackoff OnBackoff,
254
        numRetries int) error {
27,256✔
255

27,256✔
256
        waitBeforeRetry := func(attemptNumber int) bool {
27,257✔
257
                retryDelay := randRetryDelay(
1✔
258
                        DefaultRetryDelay, DefaultMaxRetryDelay, attemptNumber,
1✔
259
                )
1✔
260

1✔
261
                onBackoff(attemptNumber, retryDelay)
1✔
262

1✔
263
                select {
1✔
264
                // Before we try again, we'll wait with a random backoff based
265
                // on the retry delay.
266
                case <-time.After(retryDelay):
1✔
267
                        return true
1✔
268

269
                // If the daemon is shutting down, then we'll exit early.
270
                case <-ctx.Done():
×
271
                        return false
×
272
                }
273
        }
274

275
        for i := 0; i < numRetries; i++ {
54,513✔
276
                tx, err := makeTx()
27,257✔
277
                if err != nil {
27,257✔
278
                        dbErr := MapSQLError(err)
×
279
                        log.Tracef("Failed to makeTx: err=%v, dbErr=%v", err,
×
280
                                dbErr)
×
281

×
282
                        if IsSerializationError(dbErr) {
×
283
                                // Nothing to roll back here, since we haven't
×
284
                                // even get a transaction yet. We'll just wait
×
285
                                // and try again.
×
286
                                if waitBeforeRetry(i) {
×
287
                                        continue
×
288
                                }
289
                        }
290

291
                        return dbErr
×
292
                }
293

294
                // Rollback is safe to call even if the tx is already closed,
295
                // so if the tx commits successfully, this is a no-op.
296
                defer func() {
54,514✔
297
                        _ = tx.Rollback()
27,257✔
298
                }()
27,257✔
299

300
                if bodyErr := txBody(tx); bodyErr != nil {
27,358✔
301
                        log.Tracef("Error in txBody: %v", bodyErr)
101✔
302

101✔
303
                        // Roll back the transaction, then attempt a random
101✔
304
                        // backoff and try again if the error was a
101✔
305
                        // serialization error.
101✔
306
                        if err := rollbackTx(tx); err != nil {
101✔
307
                                return MapSQLError(err)
×
308
                        }
×
309

310
                        dbErr := MapSQLError(bodyErr)
101✔
311
                        if IsSerializationError(dbErr) {
102✔
312
                                if waitBeforeRetry(i) {
2✔
313
                                        continue
1✔
314
                                }
315
                        }
316

317
                        return dbErr
100✔
318
                }
319

320
                // Commit transaction.
321
                if commitErr := tx.Commit(); commitErr != nil {
27,156✔
322
                        log.Tracef("Failed to commit tx: %v", commitErr)
×
323

×
324
                        // Roll back the transaction, then attempt a random
×
325
                        // backoff and try again if the error was a
×
326
                        // serialization error.
×
327
                        if err := rollbackTx(tx); err != nil {
×
328
                                return MapSQLError(err)
×
329
                        }
×
330

331
                        dbErr := MapSQLError(commitErr)
×
332
                        if IsSerializationError(dbErr) {
×
333
                                if waitBeforeRetry(i) {
×
334
                                        continue
×
335
                                }
336
                        }
337

338
                        return dbErr
×
339
                }
340

341
                return nil
27,156✔
342
        }
343

344
        // If we get to this point, then we weren't able to successfully commit
345
        // a tx given the max number of retries.
346
        return ErrRetriesExceeded
×
347
}
348

349
// ExecTx is a wrapper for txBody to abstract the creation and commit of a db
350
// transaction. The db transaction is embedded in a `*Queries` that txBody
351
// needs to use when executing each one of the queries that need to be applied
352
// atomically. This can be used by other storage interfaces to parameterize the
353
// type of query and options run, in order to have access to batched operations
354
// related to a storage object.
355
func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
356
        txOptions TxOptions, txBody func(Q) error, reset func()) error {
27,256✔
357

27,256✔
358
        makeTx := func() (Tx, error) {
54,513✔
359
                return t.BatchedQuerier.BeginTx(ctx, txOptions)
27,257✔
360
        }
27,257✔
361

362
        execTxBody := func(tx Tx) error {
54,513✔
363
                sqlTx, ok := tx.(*sql.Tx)
27,257✔
364
                if !ok {
27,257✔
365
                        return fmt.Errorf("expected *sql.Tx, got %T", tx)
×
366
                }
×
367

368
                if reset == nil {
27,257✔
NEW
369
                        return fmt.Errorf("reset function is nil")
×
NEW
370
                }
×
371

372
                reset()
27,257✔
373

27,257✔
374
                return txBody(t.createQuery(sqlTx))
27,257✔
375
        }
376

377
        onBackoff := func(retry int, delay time.Duration) {
27,257✔
378
                log.Tracef("Retrying transaction due to tx serialization "+
1✔
379
                        "error, attempt_number=%v, delay=%v", retry, delay)
1✔
380
        }
1✔
381

382
        rollbackTx := func(tx Tx) error {
27,357✔
383
                sqlTx, ok := tx.(*sql.Tx)
101✔
384
                if !ok {
101✔
385
                        return fmt.Errorf("expected *sql.Tx, got %T", tx)
×
386
                }
×
387

388
                _ = sqlTx.Rollback()
101✔
389

101✔
390
                return nil
101✔
391
        }
392

393
        return ExecuteSQLTransactionWithRetry(
27,256✔
394
                ctx, makeTx, rollbackTx, execTxBody, onBackoff,
27,256✔
395
                t.opts.numRetries,
27,256✔
396
        )
27,256✔
397
}
398

399
// DB is an interface that represents a generic SQL database. It provides
400
// methods to apply migrations and access the underlying database connection.
401
type DB interface {
402
        // GetBaseDB returns the underlying BaseDB instance.
403
        GetBaseDB() *BaseDB
404

405
        // ApplyAllMigrations applies all migrations to the database including
406
        // both sqlc and custom in-code migrations.
407
        ApplyAllMigrations(ctx context.Context,
408
                customMigrations []MigrationConfig) error
409
}
410

411
// BaseDB is the base database struct that each implementation can embed to
412
// gain some common functionality.
413
type BaseDB struct {
414
        *sql.DB
415

416
        *sqlc.Queries
417
}
418

419
// BeginTx wraps the normal sql specific BeginTx method with the TxOptions
420
// interface. This interface is then mapped to the concrete sql tx options
421
// struct.
422
func (s *BaseDB) BeginTx(ctx context.Context, opts TxOptions) (*sql.Tx, error) {
27,257✔
423
        sqlOptions := sql.TxOptions{
27,257✔
424
                Isolation: sql.LevelSerializable,
27,257✔
425
                ReadOnly:  opts.ReadOnly(),
27,257✔
426
        }
27,257✔
427

27,257✔
428
        return s.DB.BeginTx(ctx, &sqlOptions)
27,257✔
429
}
27,257✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc