• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15838907453

24 Jun 2025 01:26AM UTC coverage: 57.079% (-11.1%) from 68.172%
15838907453

Pull #9982

github

web-flow
Merge e42780be2 into 45c15646c
Pull Request #9982: lnwire+lnwallet: add LocalNonces field for splice nonce coordination w/ taproot channels

103 of 167 new or added lines in 5 files covered. (61.68%)

30191 existing lines in 463 files now uncovered.

96331 of 168768 relevant lines covered (57.08%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.09
/msgmux/msg_router.go
1
package msgmux
2

3
import (
4
        "context"
5
        "fmt"
6
        "maps"
7
        "sync"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/lightningnetwork/lnd/fn/v2"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
var (
15
        // ErrDuplicateEndpoint is returned when an endpoint is registered with
16
        // a name that already exists.
17
        ErrDuplicateEndpoint = fmt.Errorf("endpoint already registered")
18

19
        // ErrUnableToRouteMsg is returned when a message is unable to be
20
        // routed to any endpoints.
21
        ErrUnableToRouteMsg = fmt.Errorf("unable to route message")
22
)
23

24
// EndpointName is the name of a given endpoint. This MUST be unique across all
25
// registered endpoints.
26
type EndpointName = string
27

28
// PeerMsg is a wire message that includes the public key of the peer that sent
29
// it.
30
type PeerMsg struct {
31
        lnwire.Message
32

33
        // PeerPub is the public key of the peer that sent this message.
34
        PeerPub btcec.PublicKey
35
}
36

37
// Endpoint is an interface that represents a message endpoint, or the
38
// sub-system that will handle processing an incoming wire message.
39
type Endpoint interface {
40
        // Name returns the name of this endpoint. This MUST be unique across
41
        // all registered endpoints.
42
        Name() EndpointName
43

44
        // CanHandle returns true if the target message can be routed to this
45
        // endpoint.
46
        CanHandle(msg PeerMsg) bool
47

48
        // SendMessage handles the target message, and returns true if the
49
        // message was able being processed.
50
        SendMessage(ctx context.Context, msg PeerMsg) bool
51
}
52

53
// MsgRouter is an interface that represents a message router, which is generic
54
// sub-system capable of routing any incoming wire message to a set of
55
// registered endpoints.
56
type Router interface {
57
        // RegisterEndpoint registers a new endpoint with the router. If a
58
        // duplicate endpoint exists, an error is returned.
59
        RegisterEndpoint(Endpoint) error
60

61
        // UnregisterEndpoint unregisters the target endpoint from the router.
62
        UnregisterEndpoint(EndpointName) error
63

64
        // RouteMsg attempts to route the target message to a registered
65
        // endpoint. If ANY endpoint could handle the message, then nil is
66
        // returned. Otherwise, ErrUnableToRouteMsg is returned.
67
        RouteMsg(PeerMsg) error
68

69
        // Start starts the peer message router.
70
        Start(ctx context.Context)
71

72
        // Stop stops the peer message router.
73
        Stop()
74
}
75

76
// sendQuery sends a query to the main event loop, and returns the response.
77
func sendQuery[Q any, R any](sendChan chan fn.Req[Q, R], queryArg Q,
78
        quit chan struct{}) fn.Result[R] {
1✔
79

1✔
80
        query, respChan := fn.NewReq[Q, R](queryArg)
1✔
81

1✔
82
        if !fn.SendOrQuit(sendChan, query, quit) {
1✔
83
                return fn.Errf[R]("router shutting down")
×
84
        }
×
85

86
        return fn.NewResult(fn.RecvResp(respChan, nil, quit))
1✔
87
}
88

89
// sendQueryErr is a helper function based on sendQuery that can be used when
90
// the query only needs an error response.
91
func sendQueryErr[Q any](sendChan chan fn.Req[Q, error], queryArg Q,
92
        quitChan chan struct{}) error {
1✔
93

1✔
94
        return fn.ElimEither(
1✔
95
                sendQuery(sendChan, queryArg, quitChan).Either,
1✔
96
                fn.Iden, fn.Iden,
1✔
97
        )
1✔
98
}
1✔
99

100
// EndpointsMap is a map of all registered endpoints.
101
type EndpointsMap map[EndpointName]Endpoint
102

103
// MultiMsgRouter is a type of message router that is capable of routing new
104
// incoming messages, permitting a message to be routed to multiple registered
105
// endpoints.
106
type MultiMsgRouter struct {
107
        startOnce sync.Once
108
        stopOnce  sync.Once
109

110
        // registerChan is the channel that all new endpoints will be sent to.
111
        registerChan chan fn.Req[Endpoint, error]
112

113
        // unregisterChan is the channel that all endpoints that are to be
114
        // removed are sent to.
115
        unregisterChan chan fn.Req[EndpointName, error]
116

117
        // msgChan is the channel that all messages will be sent to for
118
        // processing.
119
        msgChan chan fn.Req[PeerMsg, error]
120

121
        // endpointsQueries is a channel that all queries to the endpoints map
122
        // will be sent to.
123
        endpointQueries chan fn.Req[Endpoint, EndpointsMap]
124

125
        wg   sync.WaitGroup
126
        quit chan struct{}
127
}
128

129
// NewMultiMsgRouter creates a new instance of a peer message router.
130
func NewMultiMsgRouter() *MultiMsgRouter {
1✔
131
        return &MultiMsgRouter{
1✔
132
                registerChan:    make(chan fn.Req[Endpoint, error]),
1✔
133
                unregisterChan:  make(chan fn.Req[EndpointName, error]),
1✔
134
                msgChan:         make(chan fn.Req[PeerMsg, error]),
1✔
135
                endpointQueries: make(chan fn.Req[Endpoint, EndpointsMap]),
1✔
136
                quit:            make(chan struct{}),
1✔
137
        }
1✔
138
}
1✔
139

140
// Start starts the peer message router.
141
func (p *MultiMsgRouter) Start(ctx context.Context) {
1✔
142
        log.Infof("Starting Router")
1✔
143

1✔
144
        p.startOnce.Do(func() {
2✔
145
                p.wg.Add(1)
1✔
146
                go p.msgRouter(ctx)
1✔
147
        })
1✔
148
}
149

150
// Stop stops the peer message router.
151
func (p *MultiMsgRouter) Stop() {
1✔
152
        log.Infof("Stopping Router")
1✔
153

1✔
154
        p.stopOnce.Do(func() {
2✔
155
                close(p.quit)
1✔
156
                p.wg.Wait()
1✔
157
        })
1✔
158
}
159

160
// RegisterEndpoint registers a new endpoint with the router. If a duplicate
161
// endpoint exists, an error is returned.
162
func (p *MultiMsgRouter) RegisterEndpoint(endpoint Endpoint) error {
1✔
163
        return sendQueryErr(p.registerChan, endpoint, p.quit)
1✔
164
}
1✔
165

166
// UnregisterEndpoint unregisters the target endpoint from the router.
167
func (p *MultiMsgRouter) UnregisterEndpoint(name EndpointName) error {
1✔
168
        return sendQueryErr(p.unregisterChan, name, p.quit)
1✔
169
}
1✔
170

171
// RouteMsg attempts to route the target message to a registered endpoint. If
172
// ANY endpoint could handle the message, then nil is returned.
173
func (p *MultiMsgRouter) RouteMsg(msg PeerMsg) error {
1✔
174
        return sendQueryErr(p.msgChan, msg, p.quit)
1✔
175
}
1✔
176

177
// Endpoints returns a list of all registered endpoints.
UNCOV
178
func (p *MultiMsgRouter) endpoints() fn.Result[EndpointsMap] {
×
UNCOV
179
        return sendQuery(p.endpointQueries, nil, p.quit)
×
UNCOV
180
}
×
181

182
// msgRouter is the main goroutine that handles all incoming messages.
183
func (p *MultiMsgRouter) msgRouter(ctx context.Context) {
1✔
184
        defer p.wg.Done()
1✔
185

1✔
186
        // endpoints is a map of all registered endpoints.
1✔
187
        endpoints := make(map[EndpointName]Endpoint)
1✔
188

1✔
189
        for {
2✔
190
                select {
1✔
191
                // A new endpoint was just sent in, so we'll add it to our set
192
                // of registered endpoints.
193
                case newEndpointMsg := <-p.registerChan:
1✔
194
                        endpoint := newEndpointMsg.Request
1✔
195

1✔
196
                        log.Infof("MsgRouter: registering new "+
1✔
197
                                "Endpoint(%s)", endpoint.Name())
1✔
198

1✔
199
                        // If this endpoint already exists, then we'll return
1✔
200
                        // an error as we require unique names.
1✔
201
                        if _, ok := endpoints[endpoint.Name()]; ok {
1✔
UNCOV
202
                                log.Errorf("MsgRouter: rejecting "+
×
UNCOV
203
                                        "duplicate endpoint: %v",
×
UNCOV
204
                                        endpoint.Name())
×
UNCOV
205

×
UNCOV
206
                                newEndpointMsg.Resolve(ErrDuplicateEndpoint)
×
UNCOV
207

×
UNCOV
208
                                continue
×
209
                        }
210

211
                        endpoints[endpoint.Name()] = endpoint
1✔
212

1✔
213
                        newEndpointMsg.Resolve(nil)
1✔
214

215
                // A request to unregister an endpoint was just sent in, so
216
                // we'll attempt to remove it.
217
                case endpointName := <-p.unregisterChan:
1✔
218
                        delete(endpoints, endpointName.Request)
1✔
219

1✔
220
                        log.Infof("MsgRouter: unregistering "+
1✔
221
                                "Endpoint(%s)", endpointName.Request)
1✔
222

1✔
223
                        endpointName.Resolve(nil)
1✔
224

225
                // A new message was just sent in. We'll attempt to route it to
226
                // all the endpoints that can handle it.
227
                case msgQuery := <-p.msgChan:
1✔
228
                        msg := msgQuery.Request
1✔
229

1✔
230
                        // Loop through all the endpoints and send the message
1✔
231
                        // to those that can handle it the message.
1✔
232
                        var couldSend bool
1✔
233
                        for _, endpoint := range endpoints {
2✔
234
                                if endpoint.CanHandle(msg) {
2✔
235
                                        log.Tracef("MsgRouter: sending "+
1✔
236
                                                "msg %T to endpoint %s", msg,
1✔
237
                                                endpoint.Name())
1✔
238

1✔
239
                                        sent := endpoint.SendMessage(ctx, msg)
1✔
240
                                        couldSend = couldSend || sent
1✔
241
                                }
1✔
242
                        }
243

244
                        var err error
1✔
245
                        if !couldSend {
2✔
246
                                log.Tracef("MsgRouter: unable to route "+
1✔
247
                                        "msg %T", msg.Message)
1✔
248

1✔
249
                                err = ErrUnableToRouteMsg
1✔
250
                        }
1✔
251

252
                        msgQuery.Resolve(err)
1✔
253

254
                // A query for the endpoint state just came in, we'll send back
255
                // a copy of our current state.
UNCOV
256
                case endpointQuery := <-p.endpointQueries:
×
UNCOV
257
                        endpointsCopy := make(EndpointsMap, len(endpoints))
×
UNCOV
258
                        maps.Copy(endpointsCopy, endpoints)
×
UNCOV
259

×
UNCOV
260
                        endpointQuery.Resolve(endpointsCopy)
×
261

262
                case <-p.quit:
1✔
263
                        return
1✔
264
                }
265
        }
266
}
267

268
// A compile time check to ensure MultiMsgRouter implements the MsgRouter
269
// interface.
270
var _ Router = (*MultiMsgRouter)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc