• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157733617

05 Feb 2025 12:49PM UTC coverage: 57.712% (-1.1%) from 58.82%
13157733617

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19472 existing lines in 252 files now uncovered.

103634 of 179570 relevant lines covered (57.71%)

24840.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.51
/queue/gc_queue.go
1
package queue
2

3
import (
4
        "container/list"
5
        "time"
6

7
        "github.com/lightningnetwork/lnd/ticker"
8
)
9

10
// GCQueue is garbage collecting queue, which dynamically grows and contracts
11
// based on load. If the queue has items which have been returned, the queue
12
// will check every gcInterval amount of time to see if any elements are
13
// eligible to be released back to the runtime. Elements that have been in the
14
// queue for a duration of least expiryInterval will be released upon the next
15
// iteration of the garbage collection, thus the maximum amount of time an
16
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
17
// be disabled after all items in the queue have been taken or released to
18
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
19
// the steady state.
20
type GCQueue struct {
21
        // takeBuffer coordinates the delivery of items taken from the queue
22
        // such that they are delivered to requesters.
23
        takeBuffer chan interface{}
24

25
        // returnBuffer coordinates the return of items back into the queue,
26
        // where they will be kept until retaken or released.
27
        returnBuffer chan interface{}
28

29
        // newItem is a constructor, used to generate new elements if none are
30
        // otherwise available for reuse.
31
        newItem func() interface{}
32

33
        // expiryInterval is the minimum amount of time an element will remain
34
        // in the queue before being released.
35
        expiryInterval time.Duration
36

37
        // recycleTicker is a resumable ticker used to trigger a sweep to
38
        // release elements that have been in the queue longer than
39
        // expiryInterval.
40
        recycleTicker ticker.Ticker
41

42
        // freeList maintains a list of gcQueueEntries, sorted in order of
43
        // increasing time of arrival.
44
        freeList *list.List
45

46
        quit chan struct{}
47
}
48

49
// NewGCQueue creates a new garbage collecting queue, which dynamically grows
50
// and contracts based on load. If the queue has items which have been returned,
51
// the queue will check every gcInterval amount of time to see if any elements
52
// are eligible to be released back to the runtime. Elements that have been in
53
// the queue for a duration of least expiryInterval will be released upon the
54
// next iteration of the garbage collection, thus the maximum amount of time an
55
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
56
// be disabled after all items in the queue have been taken or released to
57
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
58
// the steady state. The returnQueueSize parameter is used to size the maximal
59
// number of items that can be returned without being dropped during large
60
// bursts in attempts to return items to the GCQUeue.
61
func NewGCQueue(newItem func() interface{}, returnQueueSize int,
62
        gcInterval, expiryInterval time.Duration) *GCQueue {
48✔
63

48✔
64
        q := &GCQueue{
48✔
65
                takeBuffer:     make(chan interface{}),
48✔
66
                returnBuffer:   make(chan interface{}, returnQueueSize),
48✔
67
                expiryInterval: expiryInterval,
48✔
68
                freeList:       list.New(),
48✔
69
                recycleTicker:  ticker.New(gcInterval),
48✔
70
                newItem:        newItem,
48✔
71
                quit:           make(chan struct{}),
48✔
72
        }
48✔
73

48✔
74
        go q.queueManager()
48✔
75

48✔
76
        return q
48✔
77
}
48✔
78

79
// Take returns either a recycled element from the queue, or creates a new item
80
// if none are available.
81
func (q *GCQueue) Take() interface{} {
70✔
82
        select {
70✔
83
        case item := <-q.takeBuffer:
70✔
84
                return item
70✔
UNCOV
85
        case <-time.After(time.Millisecond):
×
UNCOV
86
                return q.newItem()
×
87
        }
88
}
89

90
// Return adds the returned item to freelist if the queue's returnBuffer has
91
// available capacity. Under load, items may be dropped to ensure this method
92
// does not block.
93
func (q *GCQueue) Return(item interface{}) {
54✔
94
        select {
54✔
95
        case q.returnBuffer <- item:
54✔
96
        default:
×
97
        }
98
}
99

100
// gcQueueEntry is a tuple containing an interface{} and the time at which the
101
// item was added to the queue. The recorded time is used to determine when the
102
// entry becomes stale, and can be released if it has not already been taken.
103
type gcQueueEntry struct {
104
        item interface{}
105
        time time.Time
106
}
107

108
// queueManager maintains the free list of elements by popping the head of the
109
// queue when items are needed, and appending them to the end of the queue when
110
// items are returned. The queueManager will periodically attempt to release any
111
// items that have been in the queue longer than the expiry interval.
112
//
113
// NOTE: This method SHOULD be run as a goroutine.
114
func (q *GCQueue) queueManager() {
48✔
115
        for {
229✔
116
                // If the pool is empty, initialize a buffer pool to serve a
181✔
117
                // client that takes a buffer immediately. If this happens, this
181✔
118
                // is either:
181✔
119
                //   1) the first iteration of the loop,
181✔
120
                //   2) after all entries were garbage collected, or
181✔
121
                //   3) the freelist was emptied after the last entry was taken.
181✔
122
                //
181✔
123
                // In all of these cases, it is safe to pause the recycle ticker
181✔
124
                // since it will be resumed as soon an entry is returned to the
181✔
125
                // freelist.
181✔
126
                if q.freeList.Len() == 0 {
303✔
127
                        q.freeList.PushBack(gcQueueEntry{
122✔
128
                                item: q.newItem(),
122✔
129
                                time: time.Now(),
122✔
130
                        })
122✔
131

122✔
132
                        q.recycleTicker.Pause()
122✔
133
                }
122✔
134

135
                next := q.freeList.Front()
181✔
136

181✔
137
                select {
181✔
138

139
                // If a client requests a new write buffer, deliver the buffer
140
                // at the head of the freelist to them.
141
                case q.takeBuffer <- next.Value.(gcQueueEntry).item:
70✔
142
                        q.freeList.Remove(next)
70✔
143

144
                // If a client is returning a write buffer, add it to the free
145
                // list and resume the recycle ticker so that it can be cleared
146
                // if the entries are not quickly reused.
147
                case item := <-q.returnBuffer:
54✔
148
                        // Add the returned buffer to the freelist, recording
54✔
149
                        // the current time so we can determine when the entry
54✔
150
                        // expires.
54✔
151
                        q.freeList.PushBack(gcQueueEntry{
54✔
152
                                item: item,
54✔
153
                                time: time.Now(),
54✔
154
                        })
54✔
155

54✔
156
                        // Adding the buffer implies that we now have a non-zero
54✔
157
                        // number of elements in the free list. Resume the
54✔
158
                        // recycle ticker to cleanup any entries that go unused.
54✔
159
                        q.recycleTicker.Resume()
54✔
160

161
                // If the recycle ticker fires, we will aggressively release any
162
                // write buffers in the freelist for which the expiryInterval
163
                // has elapsed since their insertion. If after doing so, no
164
                // elements remain, we will pause the recycle ticker.
165
                case <-q.recycleTicker.Ticks():
9✔
166
                        // Since the insert time of all entries will be
9✔
167
                        // monotonically increasing, iterate over elements and
9✔
168
                        // remove all entries that have expired.
9✔
169
                        var next *list.Element
9✔
170
                        for e := q.freeList.Front(); e != nil; e = next {
54✔
171
                                // Cache the next element, since it will become
45✔
172
                                // unreachable from the current element if it is
45✔
173
                                // removed.
45✔
174
                                next = e.Next()
45✔
175
                                entry := e.Value.(gcQueueEntry)
45✔
176

45✔
177
                                // Use now - insertTime <= expiryInterval to
45✔
178
                                // determine if this entry has not expired.
45✔
179
                                if time.Since(entry.time) <= q.expiryInterval {
46✔
180
                                        // If this entry hasn't expired, then
1✔
181
                                        // all entries that follow will still be
1✔
182
                                        // valid.
1✔
183
                                        break
1✔
184
                                }
185

186
                                // Otherwise, remove the expired entry from the
187
                                // linked-list.
188
                                q.freeList.Remove(e)
44✔
189
                                entry.item = nil
44✔
190
                                e.Value = nil
44✔
191
                        }
192
                }
193
        }
194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc