• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.14
/msgmux/msg_router.go
1
package msgmux
2

3
import (
4
        "fmt"
5
        "maps"
6
        "sync"
7

8
        "github.com/btcsuite/btcd/btcec/v2"
9
        "github.com/lightningnetwork/lnd/fn/v2"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
)
12

13
var (
14
        // ErrDuplicateEndpoint is returned when an endpoint is registered with
15
        // a name that already exists.
16
        ErrDuplicateEndpoint = fmt.Errorf("endpoint already registered")
17

18
        // ErrUnableToRouteMsg is returned when a message is unable to be
19
        // routed to any endpoints.
20
        ErrUnableToRouteMsg = fmt.Errorf("unable to route message")
21
)
22

23
// EndpointName is the name of a given endpoint. This MUST be unique across all
24
// registered endpoints.
25
type EndpointName = string
26

27
// PeerMsg is a wire message that includes the public key of the peer that sent
28
// it.
29
type PeerMsg struct {
30
        lnwire.Message
31

32
        // PeerPub is the public key of the peer that sent this message.
33
        PeerPub btcec.PublicKey
34
}
35

36
// Endpoint is an interface that represents a message endpoint, or the
37
// sub-system that will handle processing an incoming wire message.
38
type Endpoint interface {
39
        // Name returns the name of this endpoint. This MUST be unique across
40
        // all registered endpoints.
41
        Name() EndpointName
42

43
        // CanHandle returns true if the target message can be routed to this
44
        // endpoint.
45
        CanHandle(msg PeerMsg) bool
46

47
        // SendMessage handles the target message, and returns true if the
48
        // message was able being processed.
49
        SendMessage(msg PeerMsg) bool
50
}
51

52
// MsgRouter is an interface that represents a message router, which is generic
53
// sub-system capable of routing any incoming wire message to a set of
54
// registered endpoints.
55
type Router interface {
56
        // RegisterEndpoint registers a new endpoint with the router. If a
57
        // duplicate endpoint exists, an error is returned.
58
        RegisterEndpoint(Endpoint) error
59

60
        // UnregisterEndpoint unregisters the target endpoint from the router.
61
        UnregisterEndpoint(EndpointName) error
62

63
        // RouteMsg attempts to route the target message to a registered
64
        // endpoint. If ANY endpoint could handle the message, then nil is
65
        // returned. Otherwise, ErrUnableToRouteMsg is returned.
66
        RouteMsg(PeerMsg) error
67

68
        // Start starts the peer message router.
69
        Start()
70

71
        // Stop stops the peer message router.
72
        Stop()
73
}
74

75
// sendQuery sends a query to the main event loop, and returns the response.
76
func sendQuery[Q any, R any](sendChan chan fn.Req[Q, R], queryArg Q,
77
        quit chan struct{}) fn.Result[R] {
3✔
78

3✔
79
        query, respChan := fn.NewReq[Q, R](queryArg)
3✔
80

3✔
81
        if !fn.SendOrQuit(sendChan, query, quit) {
3✔
82
                return fn.Errf[R]("router shutting down")
×
83
        }
×
84

85
        return fn.NewResult(fn.RecvResp(respChan, nil, quit))
3✔
86
}
87

88
// sendQueryErr is a helper function based on sendQuery that can be used when
89
// the query only needs an error response.
90
func sendQueryErr[Q any](sendChan chan fn.Req[Q, error], queryArg Q,
91
        quitChan chan struct{}) error {
3✔
92

3✔
93
        return fn.ElimEither(
3✔
94
                sendQuery(sendChan, queryArg, quitChan).Either,
3✔
95
                fn.Iden, fn.Iden,
3✔
96
        )
3✔
97
}
3✔
98

99
// EndpointsMap is a map of all registered endpoints.
100
type EndpointsMap map[EndpointName]Endpoint
101

102
// MultiMsgRouter is a type of message router that is capable of routing new
103
// incoming messages, permitting a message to be routed to multiple registered
104
// endpoints.
105
type MultiMsgRouter struct {
106
        startOnce sync.Once
107
        stopOnce  sync.Once
108

109
        // registerChan is the channel that all new endpoints will be sent to.
110
        registerChan chan fn.Req[Endpoint, error]
111

112
        // unregisterChan is the channel that all endpoints that are to be
113
        // removed are sent to.
114
        unregisterChan chan fn.Req[EndpointName, error]
115

116
        // msgChan is the channel that all messages will be sent to for
117
        // processing.
118
        msgChan chan fn.Req[PeerMsg, error]
119

120
        // endpointsQueries is a channel that all queries to the endpoints map
121
        // will be sent to.
122
        endpointQueries chan fn.Req[Endpoint, EndpointsMap]
123

124
        wg   sync.WaitGroup
125
        quit chan struct{}
126
}
127

128
// NewMultiMsgRouter creates a new instance of a peer message router.
129
func NewMultiMsgRouter() *MultiMsgRouter {
3✔
130
        return &MultiMsgRouter{
3✔
131
                registerChan:    make(chan fn.Req[Endpoint, error]),
3✔
132
                unregisterChan:  make(chan fn.Req[EndpointName, error]),
3✔
133
                msgChan:         make(chan fn.Req[PeerMsg, error]),
3✔
134
                endpointQueries: make(chan fn.Req[Endpoint, EndpointsMap]),
3✔
135
                quit:            make(chan struct{}),
3✔
136
        }
3✔
137
}
3✔
138

139
// Start starts the peer message router.
140
func (p *MultiMsgRouter) Start() {
3✔
141
        log.Infof("Starting Router")
3✔
142

3✔
143
        p.startOnce.Do(func() {
6✔
144
                p.wg.Add(1)
3✔
145
                go p.msgRouter()
3✔
146
        })
3✔
147
}
148

149
// Stop stops the peer message router.
150
func (p *MultiMsgRouter) Stop() {
3✔
151
        log.Infof("Stopping Router")
3✔
152

3✔
153
        p.stopOnce.Do(func() {
6✔
154
                close(p.quit)
3✔
155
                p.wg.Wait()
3✔
156
        })
3✔
157
}
158

159
// RegisterEndpoint registers a new endpoint with the router. If a duplicate
160
// endpoint exists, an error is returned.
UNCOV
161
func (p *MultiMsgRouter) RegisterEndpoint(endpoint Endpoint) error {
×
UNCOV
162
        return sendQueryErr(p.registerChan, endpoint, p.quit)
×
UNCOV
163
}
×
164

165
// UnregisterEndpoint unregisters the target endpoint from the router.
UNCOV
166
func (p *MultiMsgRouter) UnregisterEndpoint(name EndpointName) error {
×
UNCOV
167
        return sendQueryErr(p.unregisterChan, name, p.quit)
×
UNCOV
168
}
×
169

170
// RouteMsg attempts to route the target message to a registered endpoint. If
171
// ANY endpoint could handle the message, then nil is returned.
172
func (p *MultiMsgRouter) RouteMsg(msg PeerMsg) error {
3✔
173
        return sendQueryErr(p.msgChan, msg, p.quit)
3✔
174
}
3✔
175

176
// Endpoints returns a list of all registered endpoints.
UNCOV
177
func (p *MultiMsgRouter) endpoints() fn.Result[EndpointsMap] {
×
UNCOV
178
        return sendQuery(p.endpointQueries, nil, p.quit)
×
UNCOV
179
}
×
180

181
// msgRouter is the main goroutine that handles all incoming messages.
182
func (p *MultiMsgRouter) msgRouter() {
3✔
183
        defer p.wg.Done()
3✔
184

3✔
185
        // endpoints is a map of all registered endpoints.
3✔
186
        endpoints := make(map[EndpointName]Endpoint)
3✔
187

3✔
188
        for {
6✔
189
                select {
3✔
190
                // A new endpoint was just sent in, so we'll add it to our set
191
                // of registered endpoints.
UNCOV
192
                case newEndpointMsg := <-p.registerChan:
×
UNCOV
193
                        endpoint := newEndpointMsg.Request
×
UNCOV
194

×
UNCOV
195
                        log.Infof("MsgRouter: registering new "+
×
UNCOV
196
                                "Endpoint(%s)", endpoint.Name())
×
UNCOV
197

×
UNCOV
198
                        // If this endpoint already exists, then we'll return
×
UNCOV
199
                        // an error as we require unique names.
×
UNCOV
200
                        if _, ok := endpoints[endpoint.Name()]; ok {
×
UNCOV
201
                                log.Errorf("MsgRouter: rejecting "+
×
UNCOV
202
                                        "duplicate endpoint: %v",
×
UNCOV
203
                                        endpoint.Name())
×
UNCOV
204

×
UNCOV
205
                                newEndpointMsg.Resolve(ErrDuplicateEndpoint)
×
UNCOV
206

×
UNCOV
207
                                continue
×
208
                        }
209

UNCOV
210
                        endpoints[endpoint.Name()] = endpoint
×
UNCOV
211

×
UNCOV
212
                        newEndpointMsg.Resolve(nil)
×
213

214
                // A request to unregister an endpoint was just sent in, so
215
                // we'll attempt to remove it.
UNCOV
216
                case endpointName := <-p.unregisterChan:
×
UNCOV
217
                        delete(endpoints, endpointName.Request)
×
UNCOV
218

×
UNCOV
219
                        log.Infof("MsgRouter: unregistering "+
×
UNCOV
220
                                "Endpoint(%s)", endpointName.Request)
×
UNCOV
221

×
UNCOV
222
                        endpointName.Resolve(nil)
×
223

224
                // A new message was just sent in. We'll attempt to route it to
225
                // all the endpoints that can handle it.
226
                case msgQuery := <-p.msgChan:
3✔
227
                        msg := msgQuery.Request
3✔
228

3✔
229
                        // Loop through all the endpoints and send the message
3✔
230
                        // to those that can handle it the message.
3✔
231
                        var couldSend bool
3✔
232
                        for _, endpoint := range endpoints {
3✔
UNCOV
233
                                if endpoint.CanHandle(msg) {
×
UNCOV
234
                                        log.Tracef("MsgRouter: sending "+
×
UNCOV
235
                                                "msg %T to endpoint %s", msg,
×
UNCOV
236
                                                endpoint.Name())
×
UNCOV
237

×
UNCOV
238
                                        sent := endpoint.SendMessage(msg)
×
UNCOV
239
                                        couldSend = couldSend || sent
×
UNCOV
240
                                }
×
241
                        }
242

243
                        var err error
3✔
244
                        if !couldSend {
6✔
245
                                log.Tracef("MsgRouter: unable to route "+
3✔
246
                                        "msg %T", msg)
3✔
247

3✔
248
                                err = ErrUnableToRouteMsg
3✔
249
                        }
3✔
250

251
                        msgQuery.Resolve(err)
3✔
252

253
                // A query for the endpoint state just came in, we'll send back
254
                // a copy of our current state.
UNCOV
255
                case endpointQuery := <-p.endpointQueries:
×
UNCOV
256
                        endpointsCopy := make(EndpointsMap, len(endpoints))
×
UNCOV
257
                        maps.Copy(endpointsCopy, endpoints)
×
UNCOV
258

×
UNCOV
259
                        endpointQuery.Resolve(endpointsCopy)
×
260

261
                case <-p.quit:
3✔
262
                        return
3✔
263
                }
264
        }
265
}
266

267
// A compile time check to ensure MultiMsgRouter implements the MsgRouter
268
// interface.
269
var _ Router = (*MultiMsgRouter)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc