• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13408822928

19 Feb 2025 08:59AM UTC coverage: 41.123% (-17.7%) from 58.794%
13408822928

Pull #9521

github

web-flow
Merge d2f397b3c into 0e8786348
Pull Request #9521: unit: remove GOACC, use Go 1.20 native coverage functionality

92496 of 224923 relevant lines covered (41.12%)

18825.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.68
/subscribe/subscribe.go
1
package subscribe
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/lightningnetwork/lnd/queue"
9
)
10

11
// ErrServerShuttingDown is an error returned in case the server is in the
12
// process of shutting down.
13
var ErrServerShuttingDown = errors.New("subscription server shutting down")
14

15
// Client is used to get notified about updates the caller has subscribed to,
16
type Client struct {
17
        // cancel should be called in case the client no longer wants to
18
        // subscribe for updates from the server.
19
        cancel func()
20

21
        updates *queue.ConcurrentQueue
22
        quit    chan struct{}
23
}
24

25
// Updates returns a read-only channel where the updates the client has
26
// subscribed to will be delivered.
27
func (c *Client) Updates() <-chan interface{} {
150,000✔
28
        return c.updates.ChanOut()
150,000✔
29
}
150,000✔
30

31
// Quit is a channel that will be closed in case the server decides to no
32
// longer deliver updates to this client.
33
func (c *Client) Quit() <-chan struct{} {
100✔
34
        return c.quit
100✔
35
}
100✔
36

37
// Cancel should be called in case the client no longer wants to
38
// subscribe for updates from the server.
39
func (c *Client) Cancel() {
100✔
40
        c.cancel()
100✔
41
}
100✔
42

43
// Server is a struct that manages a set of subscriptions and their
44
// corresponding clients. Any update will be delivered to all active clients.
45
type Server struct {
46
        clientCounter uint64 // To be used atomically.
47

48
        started uint32 // To be used atomically.
49
        stopped uint32 // To be used atomically.
50

51
        clients       map[uint64]*Client
52
        clientUpdates chan *clientUpdate
53

54
        updates chan interface{}
55

56
        quit chan struct{}
57
        wg   sync.WaitGroup
58
}
59

60
// clientUpdate is an internal message sent to the subscriptionHandler to
61
// either register a new client for subscription or cancel an existing
62
// subscription.
63
type clientUpdate struct {
64
        // cancel indicates if the update to the client is cancelling an
65
        // existing client's subscription. If not then this update will be to
66
        // subscribe a new client.
67
        cancel bool
68

69
        // clientID is the unique identifier for this client. Any further
70
        // updates (deleting or adding) to this notification client will be
71
        // dispatched according to the target clientID.
72
        clientID uint64
73

74
        // client is the new client that will receive updates. Will be nil in
75
        // case this is a cancellation update.
76
        client *Client
77
}
78

79
// NewServer returns a new Server.
80
func NewServer() *Server {
1✔
81
        return &Server{
1✔
82
                clients:       make(map[uint64]*Client),
1✔
83
                clientUpdates: make(chan *clientUpdate),
1✔
84
                updates:       make(chan interface{}),
1✔
85
                quit:          make(chan struct{}),
1✔
86
        }
1✔
87
}
1✔
88

89
// Start starts the Server, making it ready to accept subscriptions and
90
// updates.
91
func (s *Server) Start() error {
1✔
92
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
1✔
93
                return nil
×
94
        }
×
95

96
        s.wg.Add(1)
1✔
97
        go s.subscriptionHandler()
1✔
98

1✔
99
        return nil
1✔
100
}
101

102
// Stop stops the server.
103
func (s *Server) Stop() error {
×
104
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
×
105
                return nil
×
106
        }
×
107

108
        close(s.quit)
×
109
        s.wg.Wait()
×
110

×
111
        return nil
×
112
}
113

114
// Subscribe returns a Client that will receive updates any time the Server is
115
// made aware of a new event.
116
func (s *Server) Subscribe() (*Client, error) {
300✔
117
        // We'll first atomically obtain the next ID for this client from the
300✔
118
        // incrementing client ID counter.
300✔
119
        clientID := atomic.AddUint64(&s.clientCounter, 1)
300✔
120

300✔
121
        // Create the client that will be returned. The Cancel method is
300✔
122
        // populated to send the cancellation intent to the
300✔
123
        // subscriptionHandler.
300✔
124
        client := &Client{
300✔
125
                updates: queue.NewConcurrentQueue(20),
300✔
126
                quit:    make(chan struct{}),
300✔
127
                cancel: func() {
400✔
128
                        select {
100✔
129
                        case s.clientUpdates <- &clientUpdate{
130
                                cancel:   true,
131
                                clientID: clientID,
132
                        }:
100✔
133
                        case <-s.quit:
×
134
                                return
×
135
                        }
136
                },
137
        }
138

139
        select {
300✔
140
        case s.clientUpdates <- &clientUpdate{
141
                cancel:   false,
142
                clientID: clientID,
143
                client:   client,
144
        }:
300✔
145
        case <-s.quit:
×
146
                return nil, ErrServerShuttingDown
×
147
        }
148

149
        return client, nil
300✔
150
}
151

152
// SendUpdate is called to send the passed update to all currently active
153
// subscription clients.
154
func (s *Server) SendUpdate(update interface{}) error {
1,000✔
155

1,000✔
156
        select {
1,000✔
157
        case s.updates <- update:
1,000✔
158
                return nil
1,000✔
159
        case <-s.quit:
×
160
                return ErrServerShuttingDown
×
161
        }
162
}
163

164
// subscriptionHandler is the main handler for the Server. It will handle
165
// incoming updates and subscriptions, and forward the incoming updates to the
166
// registered clients.
167
//
168
// NOTE: MUST be run as a goroutine.
169
func (s *Server) subscriptionHandler() {
1✔
170
        defer s.wg.Done()
1✔
171

1✔
172
        for {
1,402✔
173
                select {
1,401✔
174

175
                // If a client update is received, the either a new
176
                // subscription becomes active, or we cancel and existing one.
177
                case update := <-s.clientUpdates:
400✔
178
                        clientID := update.clientID
400✔
179

400✔
180
                        // In case this is a cancellation, stop the client's
400✔
181
                        // underlying queue, and remove the client from the set
400✔
182
                        // of active subscription clients.
400✔
183
                        if update.cancel {
500✔
184
                                client, ok := s.clients[update.clientID]
100✔
185
                                if ok {
200✔
186
                                        client.updates.Stop()
100✔
187
                                        close(client.quit)
100✔
188
                                        delete(s.clients, clientID)
100✔
189
                                }
100✔
190

191
                                continue
100✔
192
                        }
193

194
                        // If this was not a cancellation, start the underlying
195
                        // queue and add the client to our set of subscription
196
                        // clients. It will be notified about any new updates
197
                        // the server receives.
198
                        update.client.updates.Start()
300✔
199
                        s.clients[update.clientID] = update.client
300✔
200

201
                // A new update was received, forward it to all active clients.
202
                case upd := <-s.updates:
1,000✔
203
                        for _, client := range s.clients {
201,000✔
204
                                select {
200,000✔
205
                                case client.updates.ChanIn() <- upd:
200,000✔
206
                                case <-client.quit:
×
207
                                case <-s.quit:
×
208
                                        return
×
209
                                }
210
                        }
211

212
                // In case the server is shutting down, stop the clients and
213
                // close the quit channels to notify them.
214
                case <-s.quit:
×
215
                        for _, client := range s.clients {
×
216
                                client.updates.Stop()
×
217
                                close(client.quit)
×
218
                        }
×
219
                        return
×
220
                }
221
        }
222
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc