• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13593508312

28 Feb 2025 05:41PM UTC coverage: 58.287% (-10.4%) from 68.65%
13593508312

Pull #9458

github

web-flow
Merge d40067c0c into f1182e433
Pull Request #9458: multi+server.go: add initial permissions for some peers

346 of 548 new or added lines in 10 files covered. (63.14%)

27412 existing lines in 442 files now uncovered.

94709 of 162488 relevant lines covered (58.29%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.14
/msgmux/msg_router.go
1
package msgmux
2

3
import (
4
        "fmt"
5
        "maps"
6
        "sync"
7

8
        "github.com/btcsuite/btcd/btcec/v2"
9
        "github.com/lightningnetwork/lnd/fn/v2"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
)
12

13
var (
14
        // ErrDuplicateEndpoint is returned when an endpoint is registered with
15
        // a name that already exists.
16
        ErrDuplicateEndpoint = fmt.Errorf("endpoint already registered")
17

18
        // ErrUnableToRouteMsg is returned when a message is unable to be
19
        // routed to any endpoints.
20
        ErrUnableToRouteMsg = fmt.Errorf("unable to route message")
21
)
22

23
// EndpointName is the name of a given endpoint. This MUST be unique across all
24
// registered endpoints.
25
type EndpointName = string
26

27
// PeerMsg is a wire message that includes the public key of the peer that sent
28
// it.
29
type PeerMsg struct {
30
        lnwire.Message
31

32
        // PeerPub is the public key of the peer that sent this message.
33
        PeerPub btcec.PublicKey
34
}
35

36
// Endpoint is an interface that represents a message endpoint, or the
37
// sub-system that will handle processing an incoming wire message.
38
type Endpoint interface {
39
        // Name returns the name of this endpoint. This MUST be unique across
40
        // all registered endpoints.
41
        Name() EndpointName
42

43
        // CanHandle returns true if the target message can be routed to this
44
        // endpoint.
45
        CanHandle(msg PeerMsg) bool
46

47
        // SendMessage handles the target message, and returns true if the
48
        // message was able being processed.
49
        SendMessage(msg PeerMsg) bool
50
}
51

52
// MsgRouter is an interface that represents a message router, which is generic
53
// sub-system capable of routing any incoming wire message to a set of
54
// registered endpoints.
55
type Router interface {
56
        // RegisterEndpoint registers a new endpoint with the router. If a
57
        // duplicate endpoint exists, an error is returned.
58
        RegisterEndpoint(Endpoint) error
59

60
        // UnregisterEndpoint unregisters the target endpoint from the router.
61
        UnregisterEndpoint(EndpointName) error
62

63
        // RouteMsg attempts to route the target message to a registered
64
        // endpoint. If ANY endpoint could handle the message, then nil is
65
        // returned. Otherwise, ErrUnableToRouteMsg is returned.
66
        RouteMsg(PeerMsg) error
67

68
        // Start starts the peer message router.
69
        Start()
70

71
        // Stop stops the peer message router.
72
        Stop()
73
}
74

75
// sendQuery sends a query to the main event loop, and returns the response.
76
func sendQuery[Q any, R any](sendChan chan fn.Req[Q, R], queryArg Q,
77
        quit chan struct{}) fn.Result[R] {
3✔
78

3✔
79
        query, respChan := fn.NewReq[Q, R](queryArg)
3✔
80

3✔
81
        if !fn.SendOrQuit(sendChan, query, quit) {
3✔
82
                return fn.Errf[R]("router shutting down")
×
83
        }
×
84

85
        return fn.NewResult(fn.RecvResp(respChan, nil, quit))
3✔
86
}
87

88
// sendQueryErr is a helper function based on sendQuery that can be used when
89
// the query only needs an error response.
90
func sendQueryErr[Q any](sendChan chan fn.Req[Q, error], queryArg Q,
91
        quitChan chan struct{}) error {
3✔
92

3✔
93
        return fn.ElimEither(
3✔
94
                sendQuery(sendChan, queryArg, quitChan).Either,
3✔
95
                fn.Iden, fn.Iden,
3✔
96
        )
3✔
97
}
3✔
98

99
// EndpointsMap is a map of all registered endpoints.
100
type EndpointsMap map[EndpointName]Endpoint
101

102
// MultiMsgRouter is a type of message router that is capable of routing new
103
// incoming messages, permitting a message to be routed to multiple registered
104
// endpoints.
105
type MultiMsgRouter struct {
106
        startOnce sync.Once
107
        stopOnce  sync.Once
108

109
        // registerChan is the channel that all new endpoints will be sent to.
110
        registerChan chan fn.Req[Endpoint, error]
111

112
        // unregisterChan is the channel that all endpoints that are to be
113
        // removed are sent to.
114
        unregisterChan chan fn.Req[EndpointName, error]
115

116
        // msgChan is the channel that all messages will be sent to for
117
        // processing.
118
        msgChan chan fn.Req[PeerMsg, error]
119

120
        // endpointsQueries is a channel that all queries to the endpoints map
121
        // will be sent to.
122
        endpointQueries chan fn.Req[Endpoint, EndpointsMap]
123

124
        wg   sync.WaitGroup
125
        quit chan struct{}
126
}
127

128
// NewMultiMsgRouter creates a new instance of a peer message router.
129
func NewMultiMsgRouter() *MultiMsgRouter {
3✔
130
        return &MultiMsgRouter{
3✔
131
                registerChan:    make(chan fn.Req[Endpoint, error]),
3✔
132
                unregisterChan:  make(chan fn.Req[EndpointName, error]),
3✔
133
                msgChan:         make(chan fn.Req[PeerMsg, error]),
3✔
134
                endpointQueries: make(chan fn.Req[Endpoint, EndpointsMap]),
3✔
135
                quit:            make(chan struct{}),
3✔
136
        }
3✔
137
}
3✔
138

139
// Start starts the peer message router.
140
func (p *MultiMsgRouter) Start() {
3✔
141
        log.Infof("Starting Router")
3✔
142

3✔
143
        p.startOnce.Do(func() {
6✔
144
                p.wg.Add(1)
3✔
145
                go p.msgRouter()
3✔
146
        })
3✔
147
}
148

149
// Stop stops the peer message router.
150
func (p *MultiMsgRouter) Stop() {
3✔
151
        log.Infof("Stopping Router")
3✔
152

3✔
153
        p.stopOnce.Do(func() {
6✔
154
                close(p.quit)
3✔
155
                p.wg.Wait()
3✔
156
        })
3✔
157
}
158

159
// RegisterEndpoint registers a new endpoint with the router. If a duplicate
160
// endpoint exists, an error is returned.
UNCOV
161
func (p *MultiMsgRouter) RegisterEndpoint(endpoint Endpoint) error {
×
UNCOV
162
        return sendQueryErr(p.registerChan, endpoint, p.quit)
×
UNCOV
163
}
×
164

165
// UnregisterEndpoint unregisters the target endpoint from the router.
UNCOV
166
func (p *MultiMsgRouter) UnregisterEndpoint(name EndpointName) error {
×
UNCOV
167
        return sendQueryErr(p.unregisterChan, name, p.quit)
×
UNCOV
168
}
×
169

170
// RouteMsg attempts to route the target message to a registered endpoint. If
171
// ANY endpoint could handle the message, then nil is returned.
172
func (p *MultiMsgRouter) RouteMsg(msg PeerMsg) error {
3✔
173
        return sendQueryErr(p.msgChan, msg, p.quit)
3✔
174
}
3✔
175

176
// Endpoints returns a list of all registered endpoints.
UNCOV
177
func (p *MultiMsgRouter) endpoints() fn.Result[EndpointsMap] {
×
UNCOV
178
        return sendQuery(p.endpointQueries, nil, p.quit)
×
UNCOV
179
}
×
180

181
// msgRouter is the main goroutine that handles all incoming messages.
182
func (p *MultiMsgRouter) msgRouter() {
3✔
183
        defer p.wg.Done()
3✔
184

3✔
185
        // endpoints is a map of all registered endpoints.
3✔
186
        endpoints := make(map[EndpointName]Endpoint)
3✔
187

3✔
188
        for {
6✔
189
                select {
3✔
190
                // A new endpoint was just sent in, so we'll add it to our set
191
                // of registered endpoints.
UNCOV
192
                case newEndpointMsg := <-p.registerChan:
×
UNCOV
193
                        endpoint := newEndpointMsg.Request
×
UNCOV
194

×
UNCOV
195
                        log.Infof("MsgRouter: registering new "+
×
UNCOV
196
                                "Endpoint(%s)", endpoint.Name())
×
UNCOV
197

×
UNCOV
198
                        // If this endpoint already exists, then we'll return
×
UNCOV
199
                        // an error as we require unique names.
×
UNCOV
200
                        if _, ok := endpoints[endpoint.Name()]; ok {
×
UNCOV
201
                                log.Errorf("MsgRouter: rejecting "+
×
UNCOV
202
                                        "duplicate endpoint: %v",
×
UNCOV
203
                                        endpoint.Name())
×
UNCOV
204

×
UNCOV
205
                                newEndpointMsg.Resolve(ErrDuplicateEndpoint)
×
UNCOV
206

×
UNCOV
207
                                continue
×
208
                        }
209

UNCOV
210
                        endpoints[endpoint.Name()] = endpoint
×
UNCOV
211

×
UNCOV
212
                        newEndpointMsg.Resolve(nil)
×
213

214
                // A request to unregister an endpoint was just sent in, so
215
                // we'll attempt to remove it.
UNCOV
216
                case endpointName := <-p.unregisterChan:
×
UNCOV
217
                        delete(endpoints, endpointName.Request)
×
UNCOV
218

×
UNCOV
219
                        log.Infof("MsgRouter: unregistering "+
×
UNCOV
220
                                "Endpoint(%s)", endpointName.Request)
×
UNCOV
221

×
UNCOV
222
                        endpointName.Resolve(nil)
×
223

224
                // A new message was just sent in. We'll attempt to route it to
225
                // all the endpoints that can handle it.
226
                case msgQuery := <-p.msgChan:
3✔
227
                        msg := msgQuery.Request
3✔
228

3✔
229
                        // Loop through all the endpoints and send the message
3✔
230
                        // to those that can handle it the message.
3✔
231
                        var couldSend bool
3✔
232
                        for _, endpoint := range endpoints {
3✔
UNCOV
233
                                if endpoint.CanHandle(msg) {
×
UNCOV
234
                                        log.Tracef("MsgRouter: sending "+
×
UNCOV
235
                                                "msg %T to endpoint %s", msg,
×
UNCOV
236
                                                endpoint.Name())
×
UNCOV
237

×
UNCOV
238
                                        sent := endpoint.SendMessage(msg)
×
UNCOV
239
                                        couldSend = couldSend || sent
×
UNCOV
240
                                }
×
241
                        }
242

243
                        var err error
3✔
244
                        if !couldSend {
6✔
245
                                log.Tracef("MsgRouter: unable to route "+
3✔
246
                                        "msg %T", msg)
3✔
247

3✔
248
                                err = ErrUnableToRouteMsg
3✔
249
                        }
3✔
250

251
                        msgQuery.Resolve(err)
3✔
252

253
                // A query for the endpoint state just came in, we'll send back
254
                // a copy of our current state.
UNCOV
255
                case endpointQuery := <-p.endpointQueries:
×
UNCOV
256
                        endpointsCopy := make(EndpointsMap, len(endpoints))
×
UNCOV
257
                        maps.Copy(endpointsCopy, endpoints)
×
UNCOV
258

×
UNCOV
259
                        endpointQuery.Resolve(endpointsCopy)
×
260

261
                case <-p.quit:
3✔
262
                        return
3✔
263
                }
264
        }
265
}
266

267
// A compile time check to ensure MultiMsgRouter implements the MsgRouter
268
// interface.
269
var _ Router = (*MultiMsgRouter)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc