• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15838907453

24 Jun 2025 01:26AM UTC coverage: 57.079% (-11.1%) from 68.172%
15838907453

Pull #9982

github

web-flow
Merge e42780be2 into 45c15646c
Pull Request #9982: lnwire+lnwallet: add LocalNonces field for splice nonce coordination w/ taproot channels

103 of 167 new or added lines in 5 files covered. (61.68%)

30191 existing lines in 463 files now uncovered.

96331 of 168768 relevant lines covered (57.08%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.67
/batch/scheduler.go
1
package batch
2

3
import (
4
        "context"
5
        "sync"
6
        "time"
7

8
        "github.com/lightningnetwork/lnd/sqldb"
9
)
10

11
// TimeScheduler is a batching engine that executes requests within a fixed
12
// horizon. When the first request is received, a TimeScheduler waits a
13
// configurable duration for other concurrent requests to join the batch. Once
14
// this time has elapsed, the batch is closed and executed. Subsequent requests
15
// are then added to a new batch which undergoes the same process.
16
type TimeScheduler[Q any] struct {
17
        db       sqldb.BatchedTx[Q]
18
        locker   sync.Locker
19
        duration time.Duration
20

21
        mu sync.Mutex
22
        b  *batch[Q]
23
}
24

25
// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at
26
// which to schedule batches. If the operation needs to modify a higher-level
27
// cache, the cache's lock should be provided to so that external consistency
28
// can be maintained, as successful db operations will cause a request's
29
// OnCommit method to be executed while holding this lock.
30
func NewTimeScheduler[Q any](db sqldb.BatchedTx[Q], locker sync.Locker,
31
        duration time.Duration) *TimeScheduler[Q] {
1✔
32

1✔
33
        return &TimeScheduler[Q]{
1✔
34
                db:       db,
1✔
35
                locker:   locker,
1✔
36
                duration: duration,
1✔
37
        }
1✔
38
}
1✔
39

40
// Execute schedules the provided request for batch execution along with other
41
// concurrent requests. The request will be executed within a fixed horizon,
42
// parameterizeed by the duration of the scheduler. The error from the
43
// underlying operation is returned to the caller.
44
//
45
// NOTE: Part of the Scheduler interface.
46
func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error {
1✔
47
        if r.Opts == nil {
1✔
UNCOV
48
                r.Opts = NewDefaultSchedulerOpts()
×
UNCOV
49
        }
×
50

51
        req := request[Q]{
1✔
52
                Request: r,
1✔
53
                errChan: make(chan error, 1),
1✔
54
        }
1✔
55

1✔
56
        // Add the request to the current batch. If the batch has been cleared
1✔
57
        // or no batch exists, create a new one.
1✔
58
        s.mu.Lock()
1✔
59
        if s.b == nil {
2✔
60
                s.b = &batch[Q]{
1✔
61
                        db:     s.db,
1✔
62
                        clear:  s.clear,
1✔
63
                        locker: s.locker,
1✔
64

1✔
65
                        // By default, we assume that the batch is read-only,
1✔
66
                        // and we only upgrade it to read-write if a request
1✔
67
                        // is added that is not read-only.
1✔
68
                        txOpts: sqldb.ReadTxOpt(),
1✔
69
                }
1✔
70
                trigger := s.b.trigger
1✔
71
                time.AfterFunc(s.duration, func() {
2✔
72
                        trigger(ctx)
1✔
73
                })
1✔
74
        }
75
        s.b.reqs = append(s.b.reqs, &req)
1✔
76

1✔
77
        // We only upgrade the batch to read-write if the new request is not
1✔
78
        // read-only. If it is already read-write, we don't need to do anything.
1✔
79
        if s.b.txOpts.ReadOnly() && !r.Opts.ReadOnly {
2✔
80
                s.b.txOpts = sqldb.WriteTxOpt()
1✔
81
        }
1✔
82

83
        // If this is a non-lazy request, we'll execute the batch immediately.
84
        if !r.Opts.Lazy {
2✔
85
                go s.b.trigger(ctx)
1✔
86
        }
1✔
87

88
        // We need to grab a reference to the batch's txOpts so that we can
89
        // pass it before we unlock the scheduler's mutex since the batch may
90
        // be set to nil before we access the txOpts below.
91
        txOpts := s.b.txOpts
1✔
92

1✔
93
        s.mu.Unlock()
1✔
94

1✔
95
        // Wait for the batch to process the request. If the batch didn't
1✔
96
        // ask us to execute the request individually, simply return the error.
1✔
97
        err := <-req.errChan
1✔
98
        if err != errSolo {
2✔
99
                return err
1✔
100
        }
1✔
101

102
        // Obtain exclusive access to the cache if this scheduler needs to
103
        // modify the cache in OnCommit.
UNCOV
104
        if s.locker != nil {
×
UNCOV
105
                s.locker.Lock()
×
UNCOV
106
                defer s.locker.Unlock()
×
UNCOV
107
        }
×
108

109
        // Otherwise, run the request on its own.
UNCOV
110
        commitErr := s.db.ExecTx(ctx, txOpts, func(tx Q) error {
×
UNCOV
111
                return req.Do(tx)
×
UNCOV
112
        }, func() {
×
UNCOV
113
                if req.Reset != nil {
×
UNCOV
114
                        req.Reset()
×
UNCOV
115
                }
×
116
        })
117

118
        // Finally, return the commit error directly or execute the OnCommit
119
        // closure with the commit error if present.
UNCOV
120
        if req.OnCommit != nil {
×
UNCOV
121
                return req.OnCommit(commitErr)
×
UNCOV
122
        }
×
123

UNCOV
124
        return commitErr
×
125
}
126

127
// clear resets the scheduler's batch to nil so that no more requests can be
128
// added.
129
func (s *TimeScheduler[Q]) clear(b *batch[Q]) {
1✔
130
        s.mu.Lock()
1✔
131
        if s.b == b {
2✔
132
                s.b = nil
1✔
133
        }
1✔
134
        s.mu.Unlock()
1✔
135
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc