• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14358372723

09 Apr 2025 01:26PM UTC coverage: 56.696% (-12.3%) from 69.037%
14358372723

Pull #9696

github

web-flow
Merge e2837e400 into 867d27d68
Pull Request #9696: Add `development_guidelines.md` for both human and machine

107055 of 188823 relevant lines covered (56.7%)

22721.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.32
/subscribe/subscribe.go
1
package subscribe
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/lightningnetwork/lnd/queue"
9
)
10

11
// ErrServerShuttingDown is an error returned in case the server is in the
12
// process of shutting down.
13
var ErrServerShuttingDown = errors.New("subscription server shutting down")
14

15
// Client is used to get notified about updates the caller has subscribed to,
16
type Client struct {
17
        // cancel should be called in case the client no longer wants to
18
        // subscribe for updates from the server.
19
        cancel func()
20

21
        updates *queue.ConcurrentQueue
22
        quit    chan struct{}
23
}
24

25
// Updates returns a read-only channel where the updates the client has
26
// subscribed to will be delivered.
27
func (c *Client) Updates() <-chan interface{} {
150,009✔
28
        return c.updates.ChanOut()
150,009✔
29
}
150,009✔
30

31
// Quit is a channel that will be closed in case the server decides to no
32
// longer deliver updates to this client.
33
func (c *Client) Quit() <-chan struct{} {
100✔
34
        return c.quit
100✔
35
}
100✔
36

37
// Cancel should be called in case the client no longer wants to
38
// subscribe for updates from the server.
39
func (c *Client) Cancel() {
106✔
40
        c.cancel()
106✔
41
}
106✔
42

43
// Server is a struct that manages a set of subscriptions and their
44
// corresponding clients. Any update will be delivered to all active clients.
45
type Server struct {
46
        clientCounter uint64 // To be used atomically.
47

48
        started uint32 // To be used atomically.
49
        stopped uint32 // To be used atomically.
50

51
        clients       map[uint64]*Client
52
        clientUpdates chan *clientUpdate
53

54
        updates chan interface{}
55

56
        quit chan struct{}
57
        wg   sync.WaitGroup
58
}
59

60
// clientUpdate is an internal message sent to the subscriptionHandler to
61
// either register a new client for subscription or cancel an existing
62
// subscription.
63
type clientUpdate struct {
64
        // cancel indicates if the update to the client is cancelling an
65
        // existing client's subscription. If not then this update will be to
66
        // subscribe a new client.
67
        cancel bool
68

69
        // clientID is the unique identifier for this client. Any further
70
        // updates (deleting or adding) to this notification client will be
71
        // dispatched according to the target clientID.
72
        clientID uint64
73

74
        // client is the new client that will receive updates. Will be nil in
75
        // case this is a cancellation update.
76
        client *Client
77
}
78

79
// NewServer returns a new Server.
80
func NewServer() *Server {
26✔
81
        return &Server{
26✔
82
                clients:       make(map[uint64]*Client),
26✔
83
                clientUpdates: make(chan *clientUpdate),
26✔
84
                updates:       make(chan interface{}),
26✔
85
                quit:          make(chan struct{}),
26✔
86
        }
26✔
87
}
26✔
88

89
// Start starts the Server, making it ready to accept subscriptions and
90
// updates.
91
func (s *Server) Start() error {
26✔
92
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
26✔
93
                return nil
×
94
        }
×
95

96
        s.wg.Add(1)
26✔
97
        go s.subscriptionHandler()
26✔
98

26✔
99
        return nil
26✔
100
}
101

102
// Stop stops the server.
103
func (s *Server) Stop() error {
25✔
104
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
25✔
105
                return nil
×
106
        }
×
107

108
        close(s.quit)
25✔
109
        s.wg.Wait()
25✔
110

25✔
111
        return nil
25✔
112
}
113

114
// Subscribe returns a Client that will receive updates any time the Server is
115
// made aware of a new event.
116
func (s *Server) Subscribe() (*Client, error) {
309✔
117
        // We'll first atomically obtain the next ID for this client from the
309✔
118
        // incrementing client ID counter.
309✔
119
        clientID := atomic.AddUint64(&s.clientCounter, 1)
309✔
120

309✔
121
        // Create the client that will be returned. The Cancel method is
309✔
122
        // populated to send the cancellation intent to the
309✔
123
        // subscriptionHandler.
309✔
124
        client := &Client{
309✔
125
                updates: queue.NewConcurrentQueue(20),
309✔
126
                quit:    make(chan struct{}),
309✔
127
                cancel: func() {
415✔
128
                        select {
106✔
129
                        case s.clientUpdates <- &clientUpdate{
130
                                cancel:   true,
131
                                clientID: clientID,
132
                        }:
106✔
133
                        case <-s.quit:
×
134
                                return
×
135
                        }
136
                },
137
        }
138

139
        select {
309✔
140
        case s.clientUpdates <- &clientUpdate{
141
                cancel:   false,
142
                clientID: clientID,
143
                client:   client,
144
        }:
309✔
145
        case <-s.quit:
×
146
                return nil, ErrServerShuttingDown
×
147
        }
148

149
        return client, nil
309✔
150
}
151

152
// SendUpdate is called to send the passed update to all currently active
153
// subscription clients.
154
func (s *Server) SendUpdate(update interface{}) error {
1,018✔
155

1,018✔
156
        select {
1,018✔
157
        case s.updates <- update:
1,018✔
158
                return nil
1,018✔
159
        case <-s.quit:
×
160
                return ErrServerShuttingDown
×
161
        }
162
}
163

164
// subscriptionHandler is the main handler for the Server. It will handle
165
// incoming updates and subscriptions, and forward the incoming updates to the
166
// registered clients.
167
//
168
// NOTE: MUST be run as a goroutine.
169
func (s *Server) subscriptionHandler() {
26✔
170
        defer s.wg.Done()
26✔
171

26✔
172
        for {
1,485✔
173
                select {
1,459✔
174

175
                // If a client update is received, the either a new
176
                // subscription becomes active, or we cancel and existing one.
177
                case update := <-s.clientUpdates:
415✔
178
                        clientID := update.clientID
415✔
179

415✔
180
                        // In case this is a cancellation, stop the client's
415✔
181
                        // underlying queue, and remove the client from the set
415✔
182
                        // of active subscription clients.
415✔
183
                        if update.cancel {
521✔
184
                                client, ok := s.clients[update.clientID]
106✔
185
                                if ok {
212✔
186
                                        client.updates.Stop()
106✔
187
                                        close(client.quit)
106✔
188
                                        delete(s.clients, clientID)
106✔
189
                                }
106✔
190

191
                                continue
106✔
192
                        }
193

194
                        // If this was not a cancellation, start the underlying
195
                        // queue and add the client to our set of subscription
196
                        // clients. It will be notified about any new updates
197
                        // the server receives.
198
                        update.client.updates.Start()
309✔
199
                        s.clients[update.clientID] = update.client
309✔
200

201
                // A new update was received, forward it to all active clients.
202
                case upd := <-s.updates:
1,018✔
203
                        for _, client := range s.clients {
201,036✔
204
                                select {
200,018✔
205
                                case client.updates.ChanIn() <- upd:
200,018✔
206
                                case <-client.quit:
×
207
                                case <-s.quit:
×
208
                                        return
×
209
                                }
210
                        }
211

212
                // In case the server is shutting down, stop the clients and
213
                // close the quit channels to notify them.
214
                case <-s.quit:
25✔
215
                        for _, client := range s.clients {
28✔
216
                                client.updates.Stop()
3✔
217
                                close(client.quit)
3✔
218
                        }
3✔
219
                        return
25✔
220
                }
221
        }
222
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc