• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.0
/batch/batch.go
1
package batch
2

3
import (
4
        "context"
5
        "errors"
6
        "sync"
7

8
        "github.com/lightningnetwork/lnd/sqldb"
9
)
10

11
// errSolo is a sentinel error indicating that the requester should re-run the
12
// operation in isolation.
13
var errSolo = errors.New(
14
        "batch function returned an error and should be re-run solo",
15
)
16

17
type request[Q any] struct {
18
        *Request[Q]
19
        errChan chan error
20
}
21

22
type batch[Q any] struct {
23
        db     sqldb.BatchedTx[Q]
24
        start  sync.Once
25
        reqs   []*request[Q]
26
        clear  func(b *batch[Q])
27
        locker sync.Locker
28
        txOpts sqldb.TxOptions
29
}
30

31
// trigger is the entry point for the batch and ensures that run is started at
32
// most once.
33
func (b *batch[Q]) trigger(ctx context.Context) {
3✔
34
        b.start.Do(func() {
6✔
35
                b.run(ctx)
3✔
36
        })
3✔
37
}
38

39
// run executes the current batch of requests. If any individual requests fail
40
// alongside others they will be retried by the caller.
41
func (b *batch[Q]) run(ctx context.Context) {
3✔
42
        // Clear the batch from its scheduler, ensuring that no new requests are
3✔
43
        // added to this batch.
3✔
44
        b.clear(b)
3✔
45

3✔
46
        // If a cache lock was provided, hold it until the this method returns.
3✔
47
        // This is critical for ensuring external consistency of the operation,
3✔
48
        // so that caches don't get out of sync with the on disk state.
3✔
49
        if b.locker != nil {
6✔
50
                b.locker.Lock()
3✔
51
                defer b.locker.Unlock()
3✔
52
        }
3✔
53

54
        // Apply the batch until a subset succeeds or all of them fail. Requests
55
        // that fail will be retried individually.
56
        for len(b.reqs) > 0 {
6✔
57
                var failIdx = -1
3✔
58
                err := b.db.ExecTx(ctx, b.txOpts, func(tx Q) error {
6✔
59
                        for i, req := range b.reqs {
6✔
60
                                err := req.Do(tx)
3✔
61
                                if err != nil {
3✔
UNCOV
62
                                        // If we get a serialization error, we
×
UNCOV
63
                                        // want the underlying SQL retry
×
UNCOV
64
                                        // mechanism to retry the entire batch.
×
UNCOV
65
                                        // Otherwise, we can succeed in an
×
UNCOV
66
                                        // sqldb retry and still re-execute the
×
UNCOV
67
                                        // failing request individually.
×
UNCOV
68
                                        dbErr := sqldb.MapSQLError(err)
×
UNCOV
69
                                        if !sqldb.IsSerializationError(dbErr) {
×
UNCOV
70
                                                failIdx = i
×
UNCOV
71

×
UNCOV
72
                                                return err
×
UNCOV
73
                                        }
×
74

UNCOV
75
                                        return dbErr
×
76
                                }
77
                        }
78
                        return nil
3✔
79
                }, func() {
3✔
80
                        for _, req := range b.reqs {
6✔
81
                                if req.Reset != nil {
6✔
82
                                        req.Reset()
3✔
83
                                }
3✔
84
                        }
85
                })
86

87
                // If a request's Update failed, extract it and re-run the
88
                // batch. The removed request will be retried individually by
89
                // the caller.
90
                if failIdx >= 0 {
3✔
UNCOV
91
                        req := b.reqs[failIdx]
×
UNCOV
92

×
UNCOV
93
                        // It's safe to shorten b.reqs here because the
×
UNCOV
94
                        // scheduler's batch no longer points to us.
×
UNCOV
95
                        b.reqs[failIdx] = b.reqs[len(b.reqs)-1]
×
UNCOV
96
                        b.reqs = b.reqs[:len(b.reqs)-1]
×
UNCOV
97

×
UNCOV
98
                        // Tell the submitter re-run it solo, continue with the
×
UNCOV
99
                        // rest of the batch.
×
UNCOV
100
                        req.errChan <- errSolo
×
UNCOV
101
                        continue
×
102
                }
103

104
                // None of the remaining requests failed, process the errors
105
                // using each request's OnCommit closure and return the error
106
                // to the requester. If no OnCommit closure is provided, simply
107
                // return the error directly.
108
                for _, req := range b.reqs {
6✔
109
                        if req.OnCommit != nil {
6✔
110
                                req.errChan <- req.OnCommit(err)
3✔
111
                        } else {
6✔
112
                                req.errChan <- err
3✔
113
                        }
3✔
114
                }
115

116
                return
3✔
117
        }
118
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc