• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15295138404

28 May 2025 08:19AM UTC coverage: 68.5% (+10.1%) from 58.362%
15295138404

Pull #9873

github

web-flow
Merge 9cbc1f804 into 8e96bd030
Pull Request #9873: sqldb: re-usable TxOptions and NoOpReset

40 of 54 new or added lines in 7 files covered. (74.07%)

49 existing lines in 11 files now uncovered.

134003 of 195625 relevant lines covered (68.5%)

21907.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.53
/subscribe/subscribe.go
1
package subscribe
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/lightningnetwork/lnd/queue"
9
)
10

11
// ErrServerShuttingDown is an error returned in case the server is in the
12
// process of shutting down.
13
var ErrServerShuttingDown = errors.New("subscription server shutting down")
14

15
// Client is used to get notified about updates the caller has subscribed to,
16
type Client struct {
17
        // cancel should be called in case the client no longer wants to
18
        // subscribe for updates from the server.
19
        cancel func()
20

21
        updates *queue.ConcurrentQueue
22
        quit    chan struct{}
23
}
24

25
// Updates returns a read-only channel where the updates the client has
26
// subscribed to will be delivered.
27
func (c *Client) Updates() <-chan interface{} {
150,012✔
28
        return c.updates.ChanOut()
150,012✔
29
}
150,012✔
30

31
// Quit is a channel that will be closed in case the server decides to no
32
// longer deliver updates to this client.
33
func (c *Client) Quit() <-chan struct{} {
103✔
34
        return c.quit
103✔
35
}
103✔
36

37
// Cancel should be called in case the client no longer wants to
38
// subscribe for updates from the server.
39
func (c *Client) Cancel() {
109✔
40
        c.cancel()
109✔
41
}
109✔
42

43
// Server is a struct that manages a set of subscriptions and their
44
// corresponding clients. Any update will be delivered to all active clients.
45
type Server struct {
46
        clientCounter uint64 // To be used atomically.
47

48
        started uint32 // To be used atomically.
49
        stopped uint32 // To be used atomically.
50

51
        clients       map[uint64]*Client
52
        clientUpdates chan *clientUpdate
53

54
        updates chan interface{}
55

56
        quit chan struct{}
57
        wg   sync.WaitGroup
58
}
59

60
// clientUpdate is an internal message sent to the subscriptionHandler to
61
// either register a new client for subscription or cancel an existing
62
// subscription.
63
type clientUpdate struct {
64
        // cancel indicates if the update to the client is cancelling an
65
        // existing client's subscription. If not then this update will be to
66
        // subscribe a new client.
67
        cancel bool
68

69
        // clientID is the unique identifier for this client. Any further
70
        // updates (deleting or adding) to this notification client will be
71
        // dispatched according to the target clientID.
72
        clientID uint64
73

74
        // client is the new client that will receive updates. Will be nil in
75
        // case this is a cancellation update.
76
        client *Client
77
}
78

79
// NewServer returns a new Server.
80
func NewServer() *Server {
29✔
81
        return &Server{
29✔
82
                clients:       make(map[uint64]*Client),
29✔
83
                clientUpdates: make(chan *clientUpdate),
29✔
84
                updates:       make(chan interface{}),
29✔
85
                quit:          make(chan struct{}),
29✔
86
        }
29✔
87
}
29✔
88

89
// Start starts the Server, making it ready to accept subscriptions and
90
// updates.
91
func (s *Server) Start() error {
29✔
92
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
29✔
93
                return nil
×
94
        }
×
95

96
        s.wg.Add(1)
29✔
97
        go s.subscriptionHandler()
29✔
98

29✔
99
        return nil
29✔
100
}
101

102
// Stop stops the server.
103
func (s *Server) Stop() error {
28✔
104
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
28✔
105
                return nil
×
106
        }
×
107

108
        close(s.quit)
28✔
109
        s.wg.Wait()
28✔
110

28✔
111
        return nil
28✔
112
}
113

114
// Subscribe returns a Client that will receive updates any time the Server is
115
// made aware of a new event.
116
func (s *Server) Subscribe() (*Client, error) {
312✔
117
        // We'll first atomically obtain the next ID for this client from the
312✔
118
        // incrementing client ID counter.
312✔
119
        clientID := atomic.AddUint64(&s.clientCounter, 1)
312✔
120

312✔
121
        // Create the client that will be returned. The Cancel method is
312✔
122
        // populated to send the cancellation intent to the
312✔
123
        // subscriptionHandler.
312✔
124
        client := &Client{
312✔
125
                updates: queue.NewConcurrentQueue(20),
312✔
126
                quit:    make(chan struct{}),
312✔
127
                cancel: func() {
421✔
128
                        select {
109✔
129
                        case s.clientUpdates <- &clientUpdate{
130
                                cancel:   true,
131
                                clientID: clientID,
132
                        }:
109✔
133
                        case <-s.quit:
3✔
134
                                return
3✔
135
                        }
136
                },
137
        }
138

139
        select {
312✔
140
        case s.clientUpdates <- &clientUpdate{
141
                cancel:   false,
142
                clientID: clientID,
143
                client:   client,
144
        }:
312✔
145
        case <-s.quit:
3✔
146
                return nil, ErrServerShuttingDown
3✔
147
        }
148

149
        return client, nil
312✔
150
}
151

152
// SendUpdate is called to send the passed update to all currently active
153
// subscription clients.
154
func (s *Server) SendUpdate(update interface{}) error {
1,021✔
155

1,021✔
156
        select {
1,021✔
157
        case s.updates <- update:
1,021✔
158
                return nil
1,021✔
159
        case <-s.quit:
×
160
                return ErrServerShuttingDown
×
161
        }
162
}
163

164
// subscriptionHandler is the main handler for the Server. It will handle
165
// incoming updates and subscriptions, and forward the incoming updates to the
166
// registered clients.
167
//
168
// NOTE: MUST be run as a goroutine.
169
func (s *Server) subscriptionHandler() {
29✔
170
        defer s.wg.Done()
29✔
171

29✔
172
        for {
1,491✔
173
                select {
1,462✔
174

175
                // If a client update is received, the either a new
176
                // subscription becomes active, or we cancel and existing one.
177
                case update := <-s.clientUpdates:
418✔
178
                        clientID := update.clientID
418✔
179

418✔
180
                        // In case this is a cancellation, stop the client's
418✔
181
                        // underlying queue, and remove the client from the set
418✔
182
                        // of active subscription clients.
418✔
183
                        if update.cancel {
527✔
184
                                client, ok := s.clients[update.clientID]
109✔
185
                                if ok {
218✔
186
                                        client.updates.Stop()
109✔
187
                                        close(client.quit)
109✔
188
                                        delete(s.clients, clientID)
109✔
189
                                }
109✔
190

191
                                continue
109✔
192
                        }
193

194
                        // If this was not a cancellation, start the underlying
195
                        // queue and add the client to our set of subscription
196
                        // clients. It will be notified about any new updates
197
                        // the server receives.
198
                        update.client.updates.Start()
312✔
199
                        s.clients[update.clientID] = update.client
312✔
200

201
                // A new update was received, forward it to all active clients.
202
                case upd := <-s.updates:
1,021✔
203
                        for _, client := range s.clients {
201,042✔
204
                                select {
200,021✔
205
                                case client.updates.ChanIn() <- upd:
200,021✔
206
                                case <-client.quit:
×
UNCOV
207
                                case <-s.quit:
×
UNCOV
208
                                        return
×
209
                                }
210
                        }
211

212
                // In case the server is shutting down, stop the clients and
213
                // close the quit channels to notify them.
214
                case <-s.quit:
28✔
215
                        for _, client := range s.clients {
34✔
216
                                client.updates.Stop()
6✔
217
                                close(client.quit)
6✔
218
                        }
6✔
219
                        return
28✔
220
                }
221
        }
222
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc