• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15736109134

18 Jun 2025 02:46PM UTC coverage: 58.197% (-10.1%) from 68.248%
15736109134

Pull #9752

github

web-flow
Merge d2634a68c into 31c74f20f
Pull Request #9752: routerrpc: reject payment to invoice that don't have payment secret or blinded paths

6 of 13 new or added lines in 2 files covered. (46.15%)

28331 existing lines in 455 files now uncovered.

97860 of 168153 relevant lines covered (58.2%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.67
/batch/scheduler.go
1
package batch
2

3
import (
4
        "context"
5
        "sync"
6
        "time"
7

8
        "github.com/lightningnetwork/lnd/sqldb"
9
)
10

11
// TimeScheduler is a batching engine that executes requests within a fixed
12
// horizon. When the first request is received, a TimeScheduler waits a
13
// configurable duration for other concurrent requests to join the batch. Once
14
// this time has elapsed, the batch is closed and executed. Subsequent requests
15
// are then added to a new batch which undergoes the same process.
16
type TimeScheduler[Q any] struct {
17
        db       sqldb.BatchedTx[Q]
18
        locker   sync.Locker
19
        duration time.Duration
20

21
        mu sync.Mutex
22
        b  *batch[Q]
23
}
24

25
// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at
26
// which to schedule batches. If the operation needs to modify a higher-level
27
// cache, the cache's lock should be provided to so that external consistency
28
// can be maintained, as successful db operations will cause a request's
29
// OnCommit method to be executed while holding this lock.
30
func NewTimeScheduler[Q any](db sqldb.BatchedTx[Q], locker sync.Locker,
31
        duration time.Duration) *TimeScheduler[Q] {
3✔
32

3✔
33
        return &TimeScheduler[Q]{
3✔
34
                db:       db,
3✔
35
                locker:   locker,
3✔
36
                duration: duration,
3✔
37
        }
3✔
38
}
3✔
39

40
// Execute schedules the provided request for batch execution along with other
41
// concurrent requests. The request will be executed within a fixed horizon,
42
// parameterizeed by the duration of the scheduler. The error from the
43
// underlying operation is returned to the caller.
44
//
45
// NOTE: Part of the Scheduler interface.
46
func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error {
3✔
47
        if r.Opts == nil {
3✔
UNCOV
48
                r.Opts = NewDefaultSchedulerOpts()
×
UNCOV
49
        }
×
50

51
        req := request[Q]{
3✔
52
                Request: r,
3✔
53
                errChan: make(chan error, 1),
3✔
54
        }
3✔
55

3✔
56
        // Add the request to the current batch. If the batch has been cleared
3✔
57
        // or no batch exists, create a new one.
3✔
58
        s.mu.Lock()
3✔
59
        if s.b == nil {
6✔
60
                s.b = &batch[Q]{
3✔
61
                        db:     s.db,
3✔
62
                        clear:  s.clear,
3✔
63
                        locker: s.locker,
3✔
64

3✔
65
                        // By default, we assume that the batch is read-only,
3✔
66
                        // and we only upgrade it to read-write if a request
3✔
67
                        // is added that is not read-only.
3✔
68
                        txOpts: sqldb.ReadTxOpt(),
3✔
69
                }
3✔
70
                trigger := s.b.trigger
3✔
71
                time.AfterFunc(s.duration, func() {
6✔
72
                        trigger(ctx)
3✔
73
                })
3✔
74
        }
75
        s.b.reqs = append(s.b.reqs, &req)
3✔
76

3✔
77
        // We only upgrade the batch to read-write if the new request is not
3✔
78
        // read-only. If it is already read-write, we don't need to do anything.
3✔
79
        if s.b.txOpts.ReadOnly() && !r.Opts.ReadOnly {
6✔
80
                s.b.txOpts = sqldb.WriteTxOpt()
3✔
81
        }
3✔
82

83
        // If this is a non-lazy request, we'll execute the batch immediately.
84
        if !r.Opts.Lazy {
6✔
85
                go s.b.trigger(ctx)
3✔
86
        }
3✔
87

88
        // We need to grab a reference to the batch's txOpts so that we can
89
        // pass it before we unlock the scheduler's mutex since the batch may
90
        // be set to nil before we access the txOpts below.
91
        txOpts := s.b.txOpts
3✔
92

3✔
93
        s.mu.Unlock()
3✔
94

3✔
95
        // Wait for the batch to process the request. If the batch didn't
3✔
96
        // ask us to execute the request individually, simply return the error.
3✔
97
        err := <-req.errChan
3✔
98
        if err != errSolo {
6✔
99
                return err
3✔
100
        }
3✔
101

102
        // Obtain exclusive access to the cache if this scheduler needs to
103
        // modify the cache in OnCommit.
UNCOV
104
        if s.locker != nil {
×
UNCOV
105
                s.locker.Lock()
×
UNCOV
106
                defer s.locker.Unlock()
×
UNCOV
107
        }
×
108

109
        // Otherwise, run the request on its own.
UNCOV
110
        commitErr := s.db.ExecTx(ctx, txOpts, func(tx Q) error {
×
UNCOV
111
                return req.Do(tx)
×
UNCOV
112
        }, func() {
×
UNCOV
113
                if req.Reset != nil {
×
UNCOV
114
                        req.Reset()
×
UNCOV
115
                }
×
116
        })
117

118
        // Finally, return the commit error directly or execute the OnCommit
119
        // closure with the commit error if present.
UNCOV
120
        if req.OnCommit != nil {
×
UNCOV
121
                return req.OnCommit(commitErr)
×
UNCOV
122
        }
×
123

UNCOV
124
        return commitErr
×
125
}
126

127
// clear resets the scheduler's batch to nil so that no more requests can be
128
// added.
129
func (s *TimeScheduler[Q]) clear(b *batch[Q]) {
3✔
130
        s.mu.Lock()
3✔
131
        if s.b == b {
6✔
132
                s.b = nil
3✔
133
        }
3✔
134
        s.mu.Unlock()
3✔
135
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc