• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.32
/subscribe/subscribe.go
1
package subscribe
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/lightningnetwork/lnd/queue"
9
)
10

11
// ErrServerShuttingDown is an error returned in case the server is in the
12
// process of shutting down.
13
var ErrServerShuttingDown = errors.New("subscription server shutting down")
14

15
// Client is used to get notified about updates the caller has subscribed to,
16
type Client struct {
17
        // cancel should be called in case the client no longer wants to
18
        // subscribe for updates from the server.
19
        cancel func()
20

21
        updates *queue.ConcurrentQueue
22
        quit    chan struct{}
23
}
24

25
// Updates returns a read-only channel where the updates the client has
26
// subscribed to will be delivered.
27
func (c *Client) Updates() <-chan interface{} {
150,009✔
28
        return c.updates.ChanOut()
150,009✔
29
}
150,009✔
30

31
// Quit is a channel that will be closed in case the server decides to no
32
// longer deliver updates to this client.
33
func (c *Client) Quit() <-chan struct{} {
100✔
34
        return c.quit
100✔
35
}
100✔
36

37
// Cancel should be called in case the client no longer wants to
38
// subscribe for updates from the server.
39
func (c *Client) Cancel() {
106✔
40
        c.cancel()
106✔
41
}
106✔
42

43
// Server is a struct that manages a set of subscriptions and their
44
// corresponding clients. Any update will be delivered to all active clients.
45
type Server struct {
46
        clientCounter uint64 // To be used atomically.
47

48
        started uint32 // To be used atomically.
49
        stopped uint32 // To be used atomically.
50

51
        clients       map[uint64]*Client
52
        clientUpdates chan *clientUpdate
53

54
        updates chan interface{}
55

56
        quit chan struct{}
57
        wg   sync.WaitGroup
58
}
59

60
// clientUpdate is an internal message sent to the subscriptionHandler to
61
// either register a new client for subscription or cancel an existing
62
// subscription.
63
type clientUpdate struct {
64
        // cancel indicates if the update to the client is cancelling an
65
        // existing client's subscription. If not then this update will be to
66
        // subscribe a new client.
67
        cancel bool
68

69
        // clientID is the unique identifier for this client. Any further
70
        // updates (deleting or adding) to this notification client will be
71
        // dispatched according to the target clientID.
72
        clientID uint64
73

74
        // client is the new client that will receive updates. Will be nil in
75
        // case this is a cancellation update.
76
        client *Client
77
}
78

79
// NewServer returns a new Server.
80
func NewServer() *Server {
26✔
81
        return &Server{
26✔
82
                clients:       make(map[uint64]*Client),
26✔
83
                clientUpdates: make(chan *clientUpdate),
26✔
84
                updates:       make(chan interface{}),
26✔
85
                quit:          make(chan struct{}),
26✔
86
        }
26✔
87
}
26✔
88

89
// Start starts the Server, making it ready to accept subscriptions and
90
// updates.
91
func (s *Server) Start() error {
26✔
92
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
26✔
93
                return nil
×
94
        }
×
95

96
        s.wg.Add(1)
26✔
97
        go s.subscriptionHandler()
26✔
98

26✔
99
        return nil
26✔
100
}
101

102
// Stop stops the server.
103
func (s *Server) Stop() error {
25✔
104
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
25✔
105
                return nil
×
106
        }
×
107

108
        close(s.quit)
25✔
109
        s.wg.Wait()
25✔
110

25✔
111
        return nil
25✔
112
}
113

114
// Subscribe returns a Client that will receive updates any time the Server is
115
// made aware of a new event.
116
func (s *Server) Subscribe() (*Client, error) {
309✔
117
        // We'll first atomically obtain the next ID for this client from the
309✔
118
        // incrementing client ID counter.
309✔
119
        clientID := atomic.AddUint64(&s.clientCounter, 1)
309✔
120

309✔
121
        // Create the client that will be returned. The Cancel method is
309✔
122
        // populated to send the cancellation intent to the
309✔
123
        // subscriptionHandler.
309✔
124
        client := &Client{
309✔
125
                updates: queue.NewConcurrentQueue(20),
309✔
126
                quit:    make(chan struct{}),
309✔
127
                cancel: func() {
415✔
128
                        select {
106✔
129
                        case s.clientUpdates <- &clientUpdate{
130
                                cancel:   true,
131
                                clientID: clientID,
132
                        }:
106✔
UNCOV
133
                        case <-s.quit:
×
UNCOV
134
                                return
×
135
                        }
136
                },
137
        }
138

139
        select {
309✔
140
        case s.clientUpdates <- &clientUpdate{
141
                cancel:   false,
142
                clientID: clientID,
143
                client:   client,
144
        }:
309✔
UNCOV
145
        case <-s.quit:
×
UNCOV
146
                return nil, ErrServerShuttingDown
×
147
        }
148

149
        return client, nil
309✔
150
}
151

152
// SendUpdate is called to send the passed update to all currently active
153
// subscription clients.
154
func (s *Server) SendUpdate(update interface{}) error {
1,018✔
155

1,018✔
156
        select {
1,018✔
157
        case s.updates <- update:
1,018✔
158
                return nil
1,018✔
159
        case <-s.quit:
×
160
                return ErrServerShuttingDown
×
161
        }
162
}
163

164
// subscriptionHandler is the main handler for the Server. It will handle
165
// incoming updates and subscriptions, and forward the incoming updates to the
166
// registered clients.
167
//
168
// NOTE: MUST be run as a goroutine.
169
func (s *Server) subscriptionHandler() {
26✔
170
        defer s.wg.Done()
26✔
171

26✔
172
        for {
1,485✔
173
                select {
1,459✔
174

175
                // If a client update is received, the either a new
176
                // subscription becomes active, or we cancel and existing one.
177
                case update := <-s.clientUpdates:
415✔
178
                        clientID := update.clientID
415✔
179

415✔
180
                        // In case this is a cancellation, stop the client's
415✔
181
                        // underlying queue, and remove the client from the set
415✔
182
                        // of active subscription clients.
415✔
183
                        if update.cancel {
521✔
184
                                client, ok := s.clients[update.clientID]
106✔
185
                                if ok {
212✔
186
                                        client.updates.Stop()
106✔
187
                                        close(client.quit)
106✔
188
                                        delete(s.clients, clientID)
106✔
189
                                }
106✔
190

191
                                continue
106✔
192
                        }
193

194
                        // If this was not a cancellation, start the underlying
195
                        // queue and add the client to our set of subscription
196
                        // clients. It will be notified about any new updates
197
                        // the server receives.
198
                        update.client.updates.Start()
309✔
199
                        s.clients[update.clientID] = update.client
309✔
200

201
                // A new update was received, forward it to all active clients.
202
                case upd := <-s.updates:
1,018✔
203
                        for _, client := range s.clients {
201,036✔
204
                                select {
200,018✔
205
                                case client.updates.ChanIn() <- upd:
200,018✔
206
                                case <-client.quit:
×
207
                                case <-s.quit:
×
208
                                        return
×
209
                                }
210
                        }
211

212
                // In case the server is shutting down, stop the clients and
213
                // close the quit channels to notify them.
214
                case <-s.quit:
25✔
215
                        for _, client := range s.clients {
28✔
216
                                client.updates.Stop()
3✔
217
                                close(client.quit)
3✔
218
                        }
3✔
219
                        return
25✔
220
                }
221
        }
222
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc