• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15249422085

26 May 2025 08:11AM UTC coverage: 57.977% (-11.0%) from 69.015%
15249422085

push

github

web-flow
Merge pull request #9853 from lightningnetwork/elle-graphSQL8-prep

graph/db: init SQLStore caches and batch schedulers

9 of 34 new or added lines in 4 files covered. (26.47%)

29283 existing lines in 458 files now uncovered.

96475 of 166402 relevant lines covered (57.98%)

1.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.22
/batch/scheduler.go
1
package batch
2

3
import (
4
        "context"
5
        "sync"
6
        "time"
7

8
        "github.com/lightningnetwork/lnd/sqldb"
9
)
10

11
// TimeScheduler is a batching engine that executes requests within a fixed
12
// horizon. When the first request is received, a TimeScheduler waits a
13
// configurable duration for other concurrent requests to join the batch. Once
14
// this time has elapsed, the batch is closed and executed. Subsequent requests
15
// are then added to a new batch which undergoes the same process.
16
type TimeScheduler[Q any] struct {
17
        db       sqldb.BatchedTx[Q]
18
        locker   sync.Locker
19
        duration time.Duration
20

21
        mu sync.Mutex
22
        b  *batch[Q]
23
}
24

25
// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at
26
// which to schedule batches. If the operation needs to modify a higher-level
27
// cache, the cache's lock should be provided to so that external consistency
28
// can be maintained, as successful db operations will cause a request's
29
// OnCommit method to be executed while holding this lock.
30
func NewTimeScheduler[Q any](db sqldb.BatchedTx[Q], locker sync.Locker,
31
        duration time.Duration) *TimeScheduler[Q] {
2✔
32

2✔
33
        return &TimeScheduler[Q]{
2✔
34
                db:       db,
2✔
35
                locker:   locker,
2✔
36
                duration: duration,
2✔
37
        }
2✔
38
}
2✔
39

40
// Execute schedules the provided request for batch execution along with other
41
// concurrent requests. The request will be executed within a fixed horizon,
42
// parameterizeed by the duration of the scheduler. The error from the
43
// underlying operation is returned to the caller.
44
//
45
// NOTE: Part of the Scheduler interface.
46
func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error {
2✔
47
        if r.Opts == nil {
2✔
UNCOV
48
                r.Opts = NewDefaultSchedulerOpts()
×
UNCOV
49
        }
×
50

51
        req := request[Q]{
2✔
52
                Request: r,
2✔
53
                errChan: make(chan error, 1),
2✔
54
        }
2✔
55

2✔
56
        // Add the request to the current batch. If the batch has been cleared
2✔
57
        // or no batch exists, create a new one.
2✔
58
        s.mu.Lock()
2✔
59
        if s.b == nil {
4✔
60
                s.b = &batch[Q]{
2✔
61
                        db:     s.db,
2✔
62
                        clear:  s.clear,
2✔
63
                        locker: s.locker,
2✔
64

2✔
65
                        // By default, we assume that the batch is read-only,
2✔
66
                        // and we only upgrade it to read-write if a request
2✔
67
                        // is added that is not read-only.
2✔
68
                        txOpts: txOpts{
2✔
69
                                readOnly: true,
2✔
70
                        },
2✔
71
                }
2✔
72
                trigger := s.b.trigger
2✔
73
                time.AfterFunc(s.duration, func() {
4✔
74
                        trigger(ctx)
2✔
75
                })
2✔
76
        }
77
        s.b.reqs = append(s.b.reqs, &req)
2✔
78

2✔
79
        // We only upgrade the batch to read-write if the new request is not
2✔
80
        // read-only. If it is already read-write, we don't need to do anything.
2✔
81
        if s.b.txOpts.readOnly && !r.Opts.ReadOnly {
4✔
82
                s.b.txOpts.readOnly = false
2✔
83
        }
2✔
84

85
        // If this is a non-lazy request, we'll execute the batch immediately.
86
        if !r.Opts.Lazy {
4✔
87
                go s.b.trigger(ctx)
2✔
88
        }
2✔
89

90
        // We need to grab a reference to the batch's txOpts so that we can
91
        // pass it before we unlock the scheduler's mutex since the batch may
92
        // be set to nil before we access the txOpts below.
93
        txOpts := s.b.txOpts
2✔
94

2✔
95
        s.mu.Unlock()
2✔
96

2✔
97
        // Wait for the batch to process the request. If the batch didn't
2✔
98
        // ask us to execute the request individually, simply return the error.
2✔
99
        err := <-req.errChan
2✔
100
        if err != errSolo {
4✔
101
                return err
2✔
102
        }
2✔
103

104
        // Obtain exclusive access to the cache if this scheduler needs to
105
        // modify the cache in OnCommit.
UNCOV
106
        if s.locker != nil {
×
UNCOV
107
                s.locker.Lock()
×
UNCOV
108
                defer s.locker.Unlock()
×
UNCOV
109
        }
×
110

111
        // Otherwise, run the request on its own.
UNCOV
112
        commitErr := s.db.ExecTx(ctx, &txOpts, func(tx Q) error {
×
UNCOV
113
                return req.Do(tx)
×
UNCOV
114
        }, func() {
×
UNCOV
115
                if req.Reset != nil {
×
116
                        req.Reset()
×
117
                }
×
118
        })
119

120
        // Finally, return the commit error directly or execute the OnCommit
121
        // closure with the commit error if present.
UNCOV
122
        if req.OnCommit != nil {
×
123
                return req.OnCommit(commitErr)
×
124
        }
×
125

UNCOV
126
        return commitErr
×
127
}
128

129
// clear resets the scheduler's batch to nil so that no more requests can be
130
// added.
131
func (s *TimeScheduler[Q]) clear(b *batch[Q]) {
2✔
132
        s.mu.Lock()
2✔
133
        if s.b == b {
4✔
134
                s.b = nil
2✔
135
        }
2✔
136
        s.mu.Unlock()
2✔
137
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc