• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12375116696

17 Dec 2024 02:29PM UTC coverage: 58.366% (-0.2%) from 58.595%
12375116696

Pull #8777

github

ziggie1984
docs: add release-notes
Pull Request #8777: multi: make deletion of edge atomic.

132 of 177 new or added lines in 6 files covered. (74.58%)

670 existing lines in 37 files now uncovered.

133926 of 229458 relevant lines covered (58.37%)

19223.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.51
/queue/gc_queue.go
1
package queue
2

3
import (
4
        "container/list"
5
        "time"
6

7
        "github.com/lightningnetwork/lnd/ticker"
8
)
9

10
// GCQueue is garbage collecting queue, which dynamically grows and contracts
11
// based on load. If the queue has items which have been returned, the queue
12
// will check every gcInterval amount of time to see if any elements are
13
// eligible to be released back to the runtime. Elements that have been in the
14
// queue for a duration of least expiryInterval will be released upon the next
15
// iteration of the garbage collection, thus the maximum amount of time an
16
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
17
// be disabled after all items in the queue have been taken or released to
18
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
19
// the steady state.
20
type GCQueue struct {
21
        // takeBuffer coordinates the delivery of items taken from the queue
22
        // such that they are delivered to requesters.
23
        takeBuffer chan interface{}
24

25
        // returnBuffer coordinates the return of items back into the queue,
26
        // where they will be kept until retaken or released.
27
        returnBuffer chan interface{}
28

29
        // newItem is a constructor, used to generate new elements if none are
30
        // otherwise available for reuse.
31
        newItem func() interface{}
32

33
        // expiryInterval is the minimum amount of time an element will remain
34
        // in the queue before being released.
35
        expiryInterval time.Duration
36

37
        // recycleTicker is a resumable ticker used to trigger a sweep to
38
        // release elements that have been in the queue longer than
39
        // expiryInterval.
40
        recycleTicker ticker.Ticker
41

42
        // freeList maintains a list of gcQueueEntries, sorted in order of
43
        // increasing time of arrival.
44
        freeList *list.List
45

46
        quit chan struct{}
47
}
48

49
// NewGCQueue creates a new garbage collecting queue, which dynamically grows
50
// and contracts based on load. If the queue has items which have been returned,
51
// the queue will check every gcInterval amount of time to see if any elements
52
// are eligible to be released back to the runtime. Elements that have been in
53
// the queue for a duration of least expiryInterval will be released upon the
54
// next iteration of the garbage collection, thus the maximum amount of time an
55
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
56
// be disabled after all items in the queue have been taken or released to
57
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
58
// the steady state. The returnQueueSize parameter is used to size the maximal
59
// number of items that can be returned without being dropped during large
60
// bursts in attempts to return items to the GCQUeue.
61
func NewGCQueue(newItem func() interface{}, returnQueueSize int,
62
        gcInterval, expiryInterval time.Duration) *GCQueue {
48✔
63

48✔
64
        q := &GCQueue{
48✔
65
                takeBuffer:     make(chan interface{}),
48✔
66
                returnBuffer:   make(chan interface{}, returnQueueSize),
48✔
67
                expiryInterval: expiryInterval,
48✔
68
                freeList:       list.New(),
48✔
69
                recycleTicker:  ticker.New(gcInterval),
48✔
70
                newItem:        newItem,
48✔
71
                quit:           make(chan struct{}),
48✔
72
        }
48✔
73

48✔
74
        go q.queueManager()
48✔
75

48✔
76
        return q
48✔
77
}
48✔
78

79
// Take returns either a recycled element from the queue, or creates a new item
80
// if none are available.
81
func (q *GCQueue) Take() interface{} {
70✔
82
        select {
70✔
83
        case item := <-q.takeBuffer:
70✔
84
                return item
70✔
UNCOV
85
        case <-time.After(time.Millisecond):
×
UNCOV
86
                return q.newItem()
×
87
        }
88
}
89

90
// Return adds the returned item to freelist if the queue's returnBuffer has
91
// available capacity. Under load, items may be dropped to ensure this method
92
// does not block.
93
func (q *GCQueue) Return(item interface{}) {
54✔
94
        select {
54✔
95
        case q.returnBuffer <- item:
54✔
96
        default:
×
97
        }
98
}
99

100
// gcQueueEntry is a tuple containing an interface{} and the time at which the
101
// item was added to the queue. The recorded time is used to determine when the
102
// entry becomes stale, and can be released if it has not already been taken.
103
type gcQueueEntry struct {
104
        item interface{}
105
        time time.Time
106
}
107

108
// queueManager maintains the free list of elements by popping the head of the
109
// queue when items are needed, and appending them to the end of the queue when
110
// items are returned. The queueManager will periodically attempt to release any
111
// items that have been in the queue longer than the expiry interval.
112
//
113
// NOTE: This method SHOULD be run as a goroutine.
114
func (q *GCQueue) queueManager() {
48✔
115
        for {
229✔
116
                // If the pool is empty, initialize a buffer pool to serve a
181✔
117
                // client that takes a buffer immediately. If this happens, this
181✔
118
                // is either:
181✔
119
                //   1) the first iteration of the loop,
181✔
120
                //   2) after all entries were garbage collected, or
181✔
121
                //   3) the freelist was emptied after the last entry was taken.
181✔
122
                //
181✔
123
                // In all of these cases, it is safe to pause the recycle ticker
181✔
124
                // since it will be resumed as soon an entry is returned to the
181✔
125
                // freelist.
181✔
126
                if q.freeList.Len() == 0 {
302✔
127
                        q.freeList.PushBack(gcQueueEntry{
121✔
128
                                item: q.newItem(),
121✔
129
                                time: time.Now(),
121✔
130
                        })
121✔
131

121✔
132
                        q.recycleTicker.Pause()
121✔
133
                }
121✔
134

135
                next := q.freeList.Front()
181✔
136

181✔
137
                select {
181✔
138

139
                // If a client requests a new write buffer, deliver the buffer
140
                // at the head of the freelist to them.
141
                case q.takeBuffer <- next.Value.(gcQueueEntry).item:
70✔
142
                        q.freeList.Remove(next)
70✔
143

144
                // If a client is returning a write buffer, add it to the free
145
                // list and resume the recycle ticker so that it can be cleared
146
                // if the entries are not quickly reused.
147
                case item := <-q.returnBuffer:
54✔
148
                        // Add the returned buffer to the freelist, recording
54✔
149
                        // the current time so we can determine when the entry
54✔
150
                        // expires.
54✔
151
                        q.freeList.PushBack(gcQueueEntry{
54✔
152
                                item: item,
54✔
153
                                time: time.Now(),
54✔
154
                        })
54✔
155

54✔
156
                        // Adding the buffer implies that we now have a non-zero
54✔
157
                        // number of elements in the free list. Resume the
54✔
158
                        // recycle ticker to cleanup any entries that go unused.
54✔
159
                        q.recycleTicker.Resume()
54✔
160

161
                // If the recycle ticker fires, we will aggressively release any
162
                // write buffers in the freelist for which the expiryInterval
163
                // has elapsed since their insertion. If after doing so, no
164
                // elements remain, we will pause the recycle ticker.
165
                case <-q.recycleTicker.Ticks():
9✔
166
                        // Since the insert time of all entries will be
9✔
167
                        // monotonically increasing, iterate over elements and
9✔
168
                        // remove all entries that have expired.
9✔
169
                        var next *list.Element
9✔
170
                        for e := q.freeList.Front(); e != nil; e = next {
55✔
171
                                // Cache the next element, since it will become
46✔
172
                                // unreachable from the current element if it is
46✔
173
                                // removed.
46✔
174
                                next = e.Next()
46✔
175
                                entry := e.Value.(gcQueueEntry)
46✔
176

46✔
177
                                // Use now - insertTime <= expiryInterval to
46✔
178
                                // determine if this entry has not expired.
46✔
179
                                if time.Since(entry.time) <= q.expiryInterval {
48✔
180
                                        // If this entry hasn't expired, then
2✔
181
                                        // all entries that follow will still be
2✔
182
                                        // valid.
2✔
183
                                        break
2✔
184
                                }
185

186
                                // Otherwise, remove the expired entry from the
187
                                // linked-list.
188
                                q.freeList.Remove(e)
44✔
189
                                entry.item = nil
44✔
190
                                e.Value = nil
44✔
191
                        }
192
                }
193
        }
194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc