• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15298164175

28 May 2025 10:45AM UTC coverage: 58.327% (-0.04%) from 58.362%
15298164175

push

github

web-flow
Merge pull request #9873 from ellemouton/sqldbHelpers

sqldb: re-usable TxOptions and NoOpReset

4 of 38 new or added lines in 4 files covered. (10.53%)

96 existing lines in 16 files now uncovered.

97406 of 167000 relevant lines covered (58.33%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.53
/subscribe/subscribe.go
1
package subscribe
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/lightningnetwork/lnd/queue"
9
)
10

11
// ErrServerShuttingDown is an error returned in case the server is in the
12
// process of shutting down.
13
var ErrServerShuttingDown = errors.New("subscription server shutting down")
14

15
// Client is used to get notified about updates the caller has subscribed to,
16
type Client struct {
17
        // cancel should be called in case the client no longer wants to
18
        // subscribe for updates from the server.
19
        cancel func()
20

21
        updates *queue.ConcurrentQueue
22
        quit    chan struct{}
23
}
24

25
// Updates returns a read-only channel where the updates the client has
26
// subscribed to will be delivered.
27
func (c *Client) Updates() <-chan interface{} {
3✔
28
        return c.updates.ChanOut()
3✔
29
}
3✔
30

31
// Quit is a channel that will be closed in case the server decides to no
32
// longer deliver updates to this client.
33
func (c *Client) Quit() <-chan struct{} {
3✔
34
        return c.quit
3✔
35
}
3✔
36

37
// Cancel should be called in case the client no longer wants to
38
// subscribe for updates from the server.
39
func (c *Client) Cancel() {
3✔
40
        c.cancel()
3✔
41
}
3✔
42

43
// Server is a struct that manages a set of subscriptions and their
44
// corresponding clients. Any update will be delivered to all active clients.
45
type Server struct {
46
        clientCounter uint64 // To be used atomically.
47

48
        started uint32 // To be used atomically.
49
        stopped uint32 // To be used atomically.
50

51
        clients       map[uint64]*Client
52
        clientUpdates chan *clientUpdate
53

54
        updates chan interface{}
55

56
        quit chan struct{}
57
        wg   sync.WaitGroup
58
}
59

60
// clientUpdate is an internal message sent to the subscriptionHandler to
61
// either register a new client for subscription or cancel an existing
62
// subscription.
63
type clientUpdate struct {
64
        // cancel indicates if the update to the client is cancelling an
65
        // existing client's subscription. If not then this update will be to
66
        // subscribe a new client.
67
        cancel bool
68

69
        // clientID is the unique identifier for this client. Any further
70
        // updates (deleting or adding) to this notification client will be
71
        // dispatched according to the target clientID.
72
        clientID uint64
73

74
        // client is the new client that will receive updates. Will be nil in
75
        // case this is a cancellation update.
76
        client *Client
77
}
78

79
// NewServer returns a new Server.
80
func NewServer() *Server {
3✔
81
        return &Server{
3✔
82
                clients:       make(map[uint64]*Client),
3✔
83
                clientUpdates: make(chan *clientUpdate),
3✔
84
                updates:       make(chan interface{}),
3✔
85
                quit:          make(chan struct{}),
3✔
86
        }
3✔
87
}
3✔
88

89
// Start starts the Server, making it ready to accept subscriptions and
90
// updates.
91
func (s *Server) Start() error {
3✔
92
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
3✔
93
                return nil
×
94
        }
×
95

96
        s.wg.Add(1)
3✔
97
        go s.subscriptionHandler()
3✔
98

3✔
99
        return nil
3✔
100
}
101

102
// Stop stops the server.
103
func (s *Server) Stop() error {
3✔
104
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
3✔
105
                return nil
×
106
        }
×
107

108
        close(s.quit)
3✔
109
        s.wg.Wait()
3✔
110

3✔
111
        return nil
3✔
112
}
113

114
// Subscribe returns a Client that will receive updates any time the Server is
115
// made aware of a new event.
116
func (s *Server) Subscribe() (*Client, error) {
3✔
117
        // We'll first atomically obtain the next ID for this client from the
3✔
118
        // incrementing client ID counter.
3✔
119
        clientID := atomic.AddUint64(&s.clientCounter, 1)
3✔
120

3✔
121
        // Create the client that will be returned. The Cancel method is
3✔
122
        // populated to send the cancellation intent to the
3✔
123
        // subscriptionHandler.
3✔
124
        client := &Client{
3✔
125
                updates: queue.NewConcurrentQueue(20),
3✔
126
                quit:    make(chan struct{}),
3✔
127
                cancel: func() {
6✔
128
                        select {
3✔
129
                        case s.clientUpdates <- &clientUpdate{
130
                                cancel:   true,
131
                                clientID: clientID,
132
                        }:
3✔
133
                        case <-s.quit:
3✔
134
                                return
3✔
135
                        }
136
                },
137
        }
138

139
        select {
3✔
140
        case s.clientUpdates <- &clientUpdate{
141
                cancel:   false,
142
                clientID: clientID,
143
                client:   client,
144
        }:
3✔
145
        case <-s.quit:
2✔
146
                return nil, ErrServerShuttingDown
2✔
147
        }
148

149
        return client, nil
3✔
150
}
151

152
// SendUpdate is called to send the passed update to all currently active
153
// subscription clients.
154
func (s *Server) SendUpdate(update interface{}) error {
3✔
155

3✔
156
        select {
3✔
157
        case s.updates <- update:
3✔
158
                return nil
3✔
159
        case <-s.quit:
×
160
                return ErrServerShuttingDown
×
161
        }
162
}
163

164
// subscriptionHandler is the main handler for the Server. It will handle
165
// incoming updates and subscriptions, and forward the incoming updates to the
166
// registered clients.
167
//
168
// NOTE: MUST be run as a goroutine.
169
func (s *Server) subscriptionHandler() {
3✔
170
        defer s.wg.Done()
3✔
171

3✔
172
        for {
6✔
173
                select {
3✔
174

175
                // If a client update is received, the either a new
176
                // subscription becomes active, or we cancel and existing one.
177
                case update := <-s.clientUpdates:
3✔
178
                        clientID := update.clientID
3✔
179

3✔
180
                        // In case this is a cancellation, stop the client's
3✔
181
                        // underlying queue, and remove the client from the set
3✔
182
                        // of active subscription clients.
3✔
183
                        if update.cancel {
6✔
184
                                client, ok := s.clients[update.clientID]
3✔
185
                                if ok {
6✔
186
                                        client.updates.Stop()
3✔
187
                                        close(client.quit)
3✔
188
                                        delete(s.clients, clientID)
3✔
189
                                }
3✔
190

191
                                continue
3✔
192
                        }
193

194
                        // If this was not a cancellation, start the underlying
195
                        // queue and add the client to our set of subscription
196
                        // clients. It will be notified about any new updates
197
                        // the server receives.
198
                        update.client.updates.Start()
3✔
199
                        s.clients[update.clientID] = update.client
3✔
200

201
                // A new update was received, forward it to all active clients.
202
                case upd := <-s.updates:
3✔
203
                        for _, client := range s.clients {
6✔
204
                                select {
3✔
205
                                case client.updates.ChanIn() <- upd:
3✔
206
                                case <-client.quit:
×
UNCOV
207
                                case <-s.quit:
×
UNCOV
208
                                        return
×
209
                                }
210
                        }
211

212
                // In case the server is shutting down, stop the clients and
213
                // close the quit channels to notify them.
214
                case <-s.quit:
3✔
215
                        for _, client := range s.clients {
6✔
216
                                client.updates.Stop()
3✔
217
                                close(client.quit)
3✔
218
                        }
3✔
219
                        return
3✔
220
                }
221
        }
222
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc