• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15951470896

29 Jun 2025 04:23AM UTC coverage: 67.594% (-0.01%) from 67.606%
15951470896

Pull #9751

github

web-flow
Merge 599d9b051 into 6290edf14
Pull Request #9751: multi: update Go to 1.23.10 and update some packages

135088 of 199851 relevant lines covered (67.59%)

21909.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

100.0
/batch/batch.go
1
package batch
2

3
import (
4
        "context"
5
        "errors"
6
        "sync"
7

8
        "github.com/lightningnetwork/lnd/sqldb"
9
)
10

11
// errSolo is a sentinel error indicating that the requester should re-run the
12
// operation in isolation.
13
var errSolo = errors.New(
14
        "batch function returned an error and should be re-run solo",
15
)
16

17
type request[Q any] struct {
18
        *Request[Q]
19
        errChan chan error
20
}
21

22
type batch[Q any] struct {
23
        db     sqldb.BatchedTx[Q]
24
        start  sync.Once
25
        reqs   []*request[Q]
26
        clear  func(b *batch[Q])
27
        locker sync.Locker
28
        txOpts sqldb.TxOptions
29
}
30

31
// trigger is the entry point for the batch and ensures that run is started at
32
// most once.
33
func (b *batch[Q]) trigger(ctx context.Context) {
10,704✔
34
        b.start.Do(func() {
16,257✔
35
                b.run(ctx)
5,553✔
36
        })
5,553✔
37
}
38

39
// run executes the current batch of requests. If any individual requests fail
40
// alongside others they will be retried by the caller.
41
func (b *batch[Q]) run(ctx context.Context) {
5,553✔
42
        // Clear the batch from its scheduler, ensuring that no new requests are
5,553✔
43
        // added to this batch.
5,553✔
44
        b.clear(b)
5,553✔
45

5,553✔
46
        // If a cache lock was provided, hold it until the this method returns.
5,553✔
47
        // This is critical for ensuring external consistency of the operation,
5,553✔
48
        // so that caches don't get out of sync with the on disk state.
5,553✔
49
        if b.locker != nil {
9,943✔
50
                b.locker.Lock()
4,390✔
51
                defer b.locker.Unlock()
4,390✔
52
        }
4,390✔
53

54
        // Apply the batch until a subset succeeds or all of them fail. Requests
55
        // that fail will be retried individually.
56
        for len(b.reqs) > 0 {
11,106✔
57
                var failIdx = -1
5,553✔
58
                err := b.db.ExecTx(ctx, b.txOpts, func(tx Q) error {
11,106✔
59
                        for i, req := range b.reqs {
11,160✔
60
                                err := req.Do(tx)
5,607✔
61
                                if err != nil {
5,610✔
62
                                        // If we get a serialization error, we
3✔
63
                                        // want the underlying SQL retry
3✔
64
                                        // mechanism to retry the entire batch.
3✔
65
                                        // Otherwise, we can succeed in an
3✔
66
                                        // sqldb retry and still re-execute the
3✔
67
                                        // failing request individually.
3✔
68
                                        dbErr := sqldb.MapSQLError(err)
3✔
69
                                        if !sqldb.IsSerializationError(dbErr) {
5✔
70
                                                failIdx = i
2✔
71

2✔
72
                                                return err
2✔
73
                                        }
2✔
74

75
                                        return dbErr
1✔
76
                                }
77
                        }
78
                        return nil
5,550✔
79
                }, func() {
5,553✔
80
                        for _, req := range b.reqs {
11,160✔
81
                                if req.Reset != nil {
9,996✔
82
                                        req.Reset()
4,389✔
83
                                }
4,389✔
84
                        }
85
                })
86

87
                // If a request's Update failed, extract it and re-run the
88
                // batch. The removed request will be retried individually by
89
                // the caller.
90
                if failIdx >= 0 {
5,555✔
91
                        req := b.reqs[failIdx]
2✔
92

2✔
93
                        // It's safe to shorten b.reqs here because the
2✔
94
                        // scheduler's batch no longer points to us.
2✔
95
                        b.reqs[failIdx] = b.reqs[len(b.reqs)-1]
2✔
96
                        b.reqs = b.reqs[:len(b.reqs)-1]
2✔
97

2✔
98
                        // Tell the submitter re-run it solo, continue with the
2✔
99
                        // rest of the batch.
2✔
100
                        req.errChan <- errSolo
2✔
101
                        continue
2✔
102
                }
103

104
                // None of the remaining requests failed, process the errors
105
                // using each request's OnCommit closure and return the error
106
                // to the requester. If no OnCommit closure is provided, simply
107
                // return the error directly.
108
                for _, req := range b.reqs {
11,156✔
109
                        if req.OnCommit != nil {
9,993✔
110
                                req.errChan <- req.OnCommit(err)
4,388✔
111
                        } else {
5,608✔
112
                                req.errChan <- err
1,220✔
113
                        }
1,220✔
114
                }
115

116
                return
5,551✔
117
        }
118
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc