• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13566028875

27 Feb 2025 12:09PM UTC coverage: 49.396% (-9.4%) from 58.748%
13566028875

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

34 of 54 new or added lines in 4 files covered. (62.96%)

27464 existing lines in 436 files now uncovered.

101095 of 204664 relevant lines covered (49.4%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/protofsm/state_machine.go
1
package protofsm
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/lightningnetwork/lnd/chainntnfs"
14
        "github.com/lightningnetwork/lnd/fn/v2"
15
        "github.com/lightningnetwork/lnd/lnutils"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
const (
20
        // pollInterval is the interval at which we'll poll the SendWhen
21
        // predicate if specified.
22
        pollInterval = time.Millisecond * 100
23
)
24

25
var (
26
        // ErrStateMachineShutdown occurs when trying to feed an event to a
27
        // StateMachine that has been asked to Stop.
28
        ErrStateMachineShutdown = fmt.Errorf("StateMachine is shutting down")
29
)
30

31
// EmittedEvent is a special type that can be emitted by a state transition.
32
// This can container internal events which are to be routed back to the state,
33
// or external events which are to be sent to the daemon.
34
type EmittedEvent[Event any] struct {
35
        // InternalEvent is an optional internal event that is to be routed
36
        // back to the target state. This enables state to trigger one or many
37
        // state transitions without a new external event.
38
        InternalEvent []Event
39

40
        // ExternalEvent is an optional external event that is to be sent to
41
        // the daemon for dispatch. Usually, this is some form of I/O.
42
        ExternalEvents DaemonEventSet
43
}
44

45
// StateTransition is a state transition type. It denotes the next state to go
46
// to, and also the set of events to emit.
47
type StateTransition[Event any, Env Environment] struct {
48
        // NextState is the next state to transition to.
49
        NextState State[Event, Env]
50

51
        // NewEvents is the set of events to emit.
52
        NewEvents fn.Option[EmittedEvent[Event]]
53
}
54

55
// Environment is an abstract interface that represents the environment that
56
// the state machine will execute using. From the PoV of the main state machine
57
// executor, we just care about being able to clean up any resources that were
58
// allocated by the environment.
59
type Environment interface {
60
        // Name returns the name of the environment. This is used to uniquely
61
        // identify the environment of related state machines.
62
        Name() string
63
}
64

65
// State defines an abstract state along, namely its state transition function
66
// that takes as input an event and an environment, and returns a state
67
// transition (next state, and set of events to emit). As state can also either
68
// be terminal, or not, a terminal event causes state execution to halt.
69
type State[Event any, Env Environment] interface {
70
        // ProcessEvent takes an event and an environment, and returns a new
71
        // state transition. This will be iteratively called until either a
72
        // terminal state is reached, or no further internal events are
73
        // emitted.
74
        ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
75

76
        // IsTerminal returns true if this state is terminal, and false
77
        // otherwise.
78
        IsTerminal() bool
79

80
        // TODO(roasbeef): also add state serialization?
81
}
82

83
// DaemonAdapters is a set of methods that server as adapters to bridge the
84
// pure world of the FSM to the real world of the daemon. These will be used to
85
// do things like broadcast transactions, or send messages to peers.
86
type DaemonAdapters interface {
87
        // SendMessages sends the target set of messages to the target peer.
88
        SendMessages(btcec.PublicKey, []lnwire.Message) error
89

90
        // BroadcastTransaction broadcasts a transaction with the target label.
91
        BroadcastTransaction(*wire.MsgTx, string) error
92

93
        // RegisterConfirmationsNtfn registers an intent to be notified once
94
        // txid reaches numConfs confirmations. We also pass in the pkScript as
95
        // the default light client instead needs to match on scripts created
96
        // in the block. If a nil txid is passed in, then not only should we
97
        // match on the script, but we should also dispatch once the
98
        // transaction containing the script reaches numConfs confirmations.
99
        // This can be useful in instances where we only know the script in
100
        // advance, but not the transaction containing it.
101
        //
102
        // TODO(roasbeef): could abstract further?
103
        RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte,
104
                numConfs, heightHint uint32,
105
                opts ...chainntnfs.NotifierOption,
106
        ) (*chainntnfs.ConfirmationEvent, error)
107

108
        // RegisterSpendNtfn registers an intent to be notified once the target
109
        // outpoint is successfully spent within a transaction. The script that
110
        // the outpoint creates must also be specified. This allows this
111
        // interface to be implemented by BIP 158-like filtering.
112
        RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte,
113
                heightHint uint32) (*chainntnfs.SpendEvent, error)
114
}
115

116
// stateQuery is used by outside callers to query the internal state of the
117
// state machine.
118
type stateQuery[Event any, Env Environment] struct {
119
        // CurrentState is a channel that will be sent the current state of the
120
        // state machine.
121
        CurrentState chan State[Event, Env]
122
}
123

124
// StateMachine represents an abstract FSM that is able to process new incoming
125
// events and drive a state machine to termination. This implementation uses
126
// type params to abstract over the types of events and environment. Events
127
// trigger new state transitions, that use the environment to perform some
128
// action.
129
//
130
// TODO(roasbeef): terminal check, daemon event execution, init?
131
type StateMachine[Event any, Env Environment] struct {
132
        cfg StateMachineCfg[Event, Env]
133

134
        log btclog.Logger
135

136
        // events is the channel that will be used to send new events to the
137
        // FSM.
138
        events chan Event
139

140
        // newStateEvents is an EventDistributor that will be used to notify
141
        // any relevant callers of new state transitions that occur.
142
        newStateEvents *fn.EventDistributor[State[Event, Env]]
143

144
        // stateQuery is a channel that will be used by outside callers to
145
        // query the internal state machine state.
146
        stateQuery chan stateQuery[Event, Env]
147

148
        gm   fn.GoroutineManager
149
        quit chan struct{}
150

151
        startOnce sync.Once
152
        stopOnce  sync.Once
153
}
154

155
// ErrorReporter is an interface that's used to report errors that occur during
156
// state machine execution.
157
type ErrorReporter interface {
158
        // ReportError is a method that's used to report an error that occurred
159
        // during state machine execution.
160
        ReportError(err error)
161
}
162

163
// StateMachineCfg is a configuration struct that's used to create a new state
164
// machine.
165
type StateMachineCfg[Event any, Env Environment] struct {
166
        // ErrorReporter is used to report errors that occur during state
167
        // transitions.
168
        ErrorReporter ErrorReporter
169

170
        // Daemon is a set of adapters that will be used to bridge the FSM to
171
        // the daemon.
172
        Daemon DaemonAdapters
173

174
        // InitialState is the initial state of the state machine.
175
        InitialState State[Event, Env]
176

177
        // Env is the environment that the state machine will use to execute.
178
        Env Env
179

180
        // InitEvent is an optional event that will be sent to the state
181
        // machine as if it was emitted at the onset of the state machine. This
182
        // can be used to set up tracking state such as a txid confirmation
183
        // event.
184
        InitEvent fn.Option[DaemonEvent]
185

186
        // MsgMapper is an optional message mapper that can be used to map
187
        // normal wire messages into FSM events.
188
        MsgMapper fn.Option[MsgMapper[Event]]
189

190
        // CustomPollInterval is an optional custom poll interval that can be
191
        // used to set a quicker interval for tests.
192
        CustomPollInterval fn.Option[time.Duration]
193
}
194

195
// NewStateMachine creates a new state machine given a set of daemon adapters,
196
// an initial state, an environment, and an event to process as if emitted at
197
// the onset of the state machine. Such an event can be used to set up tracking
198
// state such as a txid confirmation event.
199
func NewStateMachine[Event any, Env Environment](
UNCOV
200
        cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] {
×
UNCOV
201

×
UNCOV
202
        return StateMachine[Event, Env]{
×
UNCOV
203
                cfg: cfg,
×
UNCOV
204
                log: log.WithPrefix(
×
UNCOV
205
                        fmt.Sprintf("FSM(%v):", cfg.Env.Name()),
×
UNCOV
206
                ),
×
UNCOV
207
                events:         make(chan Event, 1),
×
UNCOV
208
                stateQuery:     make(chan stateQuery[Event, Env]),
×
UNCOV
209
                gm:             *fn.NewGoroutineManager(),
×
UNCOV
210
                newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
×
UNCOV
211
                quit:           make(chan struct{}),
×
UNCOV
212
        }
×
UNCOV
213
}
×
214

215
// Start starts the state machine. This will spawn a goroutine that will drive
216
// the state machine to completion.
UNCOV
217
func (s *StateMachine[Event, Env]) Start(ctx context.Context) {
×
UNCOV
218
        s.startOnce.Do(func() {
×
UNCOV
219
                _ = s.gm.Go(ctx, func(ctx context.Context) {
×
UNCOV
220
                        s.driveMachine(ctx)
×
UNCOV
221
                })
×
222
        })
223
}
224

225
// Stop stops the state machine. This will block until the state machine has
226
// reached a stopping point.
UNCOV
227
func (s *StateMachine[Event, Env]) Stop() {
×
UNCOV
228
        s.stopOnce.Do(func() {
×
UNCOV
229
                close(s.quit)
×
UNCOV
230
                s.gm.Stop()
×
UNCOV
231
        })
×
232
}
233

234
// SendEvent sends a new event to the state machine.
235
//
236
// TODO(roasbeef): bool if processed?
UNCOV
237
func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
×
UNCOV
238
        s.log.DebugS(ctx, "Sending event",
×
UNCOV
239
                "event", lnutils.SpewLogClosure(event))
×
UNCOV
240

×
UNCOV
241
        select {
×
UNCOV
242
        case s.events <- event:
×
243
        case <-ctx.Done():
×
244
                return
×
245
        case <-s.quit:
×
246
                return
×
247
        }
248
}
249

250
// CanHandle returns true if the target message can be routed to the state
251
// machine.
UNCOV
252
func (s *StateMachine[Event, Env]) CanHandle(msg lnwire.Message) bool {
×
UNCOV
253
        cfgMapper := s.cfg.MsgMapper
×
UNCOV
254
        return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool {
×
UNCOV
255
                return mapper.MapMsg(msg).IsSome()
×
UNCOV
256
        })
×
257
}
258

259
// Name returns the name of the state machine's environment.
260
func (s *StateMachine[Event, Env]) Name() string {
×
261
        return s.cfg.Env.Name()
×
262
}
×
263

264
// SendMessage attempts to send a wire message to the state machine. If the
265
// message can be mapped using the default message mapper, then true is
266
// returned indicating that the message was processed. Otherwise, false is
267
// returned.
268
func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context,
UNCOV
269
        msg lnwire.Message) bool {
×
UNCOV
270

×
UNCOV
271
        // If we have no message mapper, then return false as we can't process
×
UNCOV
272
        // this message.
×
UNCOV
273
        if !s.cfg.MsgMapper.IsSome() {
×
274
                return false
×
275
        }
×
276

UNCOV
277
        s.log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg))
×
UNCOV
278

×
UNCOV
279
        // Otherwise, try to map the message using the default message mapper.
×
UNCOV
280
        // If we can't extract an event, then we'll return false to indicate
×
UNCOV
281
        // that the message wasn't processed.
×
UNCOV
282
        var processed bool
×
UNCOV
283
        s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) {
×
UNCOV
284
                event := mapper.MapMsg(msg)
×
UNCOV
285

×
UNCOV
286
                event.WhenSome(func(event Event) {
×
UNCOV
287
                        s.SendEvent(ctx, event)
×
UNCOV
288

×
UNCOV
289
                        processed = true
×
UNCOV
290
                })
×
291
        })
292

UNCOV
293
        return processed
×
294
}
295

296
// CurrentState returns the current state of the state machine.
UNCOV
297
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
×
UNCOV
298
        query := stateQuery[Event, Env]{
×
UNCOV
299
                CurrentState: make(chan State[Event, Env], 1),
×
UNCOV
300
        }
×
UNCOV
301

×
UNCOV
302
        if !fn.SendOrQuit(s.stateQuery, query, s.quit) {
×
303
                return nil, ErrStateMachineShutdown
×
304
        }
×
305

UNCOV
306
        return fn.RecvOrTimeout(query.CurrentState, time.Second)
×
307
}
308

309
// StateSubscriber represents an active subscription to be notified of new
310
// state transitions.
311
type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
312

313
// RegisterStateEvents registers a new event listener that will be notified of
314
// new state transitions.
315
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[
UNCOV
316
        Event, Env] {
×
UNCOV
317

×
UNCOV
318
        subscriber := fn.NewEventReceiver[State[Event, Env]](10)
×
UNCOV
319

×
UNCOV
320
        // TODO(roasbeef): instead give the state and the input event?
×
UNCOV
321

×
UNCOV
322
        s.newStateEvents.RegisterSubscriber(subscriber)
×
UNCOV
323

×
UNCOV
324
        return subscriber
×
UNCOV
325
}
×
326

327
// RemoveStateSub removes the target state subscriber from the set of active
328
// subscribers.
329
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
UNCOV
330
        Event, Env]) {
×
UNCOV
331

×
UNCOV
332
        _ = s.newStateEvents.RemoveSubscriber(sub)
×
UNCOV
333
}
×
334

335
// executeDaemonEvent executes a daemon event, which is a special type of event
336
// that can be emitted as part of the state transition function of the state
337
// machine. An error is returned if the type of event is unknown.
338
func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
UNCOV
339
        event DaemonEvent) error {
×
UNCOV
340

×
UNCOV
341
        switch daemonEvent := event.(type) {
×
342
        // This is a send message event, so we'll send the event, and also mind
343
        // any preconditions as well as post-send events.
UNCOV
344
        case *SendMsgEvent[Event]:
×
UNCOV
345
                sendAndCleanUp := func() error {
×
UNCOV
346
                        s.log.DebugS(ctx, "Sending message to target",
×
UNCOV
347
                                btclog.Hex6("target", daemonEvent.TargetPeer.SerializeCompressed()),
×
UNCOV
348
                                "messages", lnutils.SpewLogClosure(daemonEvent.Msgs))
×
UNCOV
349

×
UNCOV
350
                        err := s.cfg.Daemon.SendMessages(
×
UNCOV
351
                                daemonEvent.TargetPeer, daemonEvent.Msgs,
×
UNCOV
352
                        )
×
UNCOV
353
                        if err != nil {
×
354
                                return fmt.Errorf("unable to send msgs: %w",
×
355
                                        err)
×
356
                        }
×
357

358
                        // If a post-send event was specified, then we'll funnel
359
                        // that back into the main state machine now as well.
UNCOV
360
                        return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
×
UNCOV
361
                                launched := s.gm.Go(
×
UNCOV
362
                                        ctx, func(ctx context.Context) {
×
UNCOV
363
                                                s.log.DebugS(ctx, "Sending post-send event",
×
UNCOV
364
                                                        "event", lnutils.SpewLogClosure(event))
×
UNCOV
365

×
UNCOV
366
                                                s.SendEvent(ctx, event)
×
UNCOV
367
                                        },
×
368
                                )
369

UNCOV
370
                                if !launched {
×
371
                                        return ErrStateMachineShutdown
×
372
                                }
×
373

UNCOV
374
                                return nil
×
375
                        })
376
                }
377

378
                // If this doesn't have a SendWhen predicate, then we can just
379
                // send it off right away.
UNCOV
380
                if !daemonEvent.SendWhen.IsSome() {
×
UNCOV
381
                        return sendAndCleanUp()
×
UNCOV
382
                }
×
383

384
                // Otherwise, this has a SendWhen predicate, so we'll need
385
                // launch a goroutine to poll the SendWhen, then send only once
386
                // the predicate is true.
UNCOV
387
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
UNCOV
388
                        predicateTicker := time.NewTicker(
×
UNCOV
389
                                s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
×
UNCOV
390
                        )
×
UNCOV
391
                        defer predicateTicker.Stop()
×
UNCOV
392

×
UNCOV
393
                        s.log.InfoS(ctx, "Waiting for send predicate to be true")
×
UNCOV
394

×
UNCOV
395
                        for {
×
UNCOV
396
                                select {
×
UNCOV
397
                                case <-predicateTicker.C:
×
UNCOV
398
                                        canSend := fn.MapOptionZ(
×
UNCOV
399
                                                daemonEvent.SendWhen,
×
UNCOV
400
                                                func(pred SendPredicate) bool {
×
UNCOV
401
                                                        return pred()
×
UNCOV
402
                                                },
×
403
                                        )
404

UNCOV
405
                                        if canSend {
×
UNCOV
406
                                                s.log.InfoS(ctx, "Send active predicate")
×
UNCOV
407

×
UNCOV
408
                                                err := sendAndCleanUp()
×
UNCOV
409
                                                if err != nil {
×
410
                                                        s.log.ErrorS(ctx, "Unable to send message", err)
×
411
                                                }
×
412

UNCOV
413
                                                return
×
414
                                        }
415

UNCOV
416
                                case <-ctx.Done():
×
UNCOV
417
                                        return
×
418
                                }
419
                        }
420
                })
421

UNCOV
422
                if !launched {
×
423
                        return ErrStateMachineShutdown
×
424
                }
×
425

UNCOV
426
                return nil
×
427

428
        // If this is a broadcast transaction event, then we'll broadcast with
429
        // the label attached.
UNCOV
430
        case *BroadcastTxn:
×
UNCOV
431
                s.log.DebugS(ctx, "Broadcasting txn",
×
UNCOV
432
                        "txid", daemonEvent.Tx.TxHash())
×
UNCOV
433

×
UNCOV
434
                err := s.cfg.Daemon.BroadcastTransaction(
×
UNCOV
435
                        daemonEvent.Tx, daemonEvent.Label,
×
UNCOV
436
                )
×
UNCOV
437
                if err != nil {
×
438
                        return fmt.Errorf("unable to broadcast txn: %w", err)
×
439
                }
×
440

UNCOV
441
                return nil
×
442

443
        // The state machine has requested a new event to be sent once a
444
        // transaction spending a specified outpoint has confirmed.
UNCOV
445
        case *RegisterSpend[Event]:
×
UNCOV
446
                s.log.DebugS(ctx, "Registering spend",
×
UNCOV
447
                        "outpoint", daemonEvent.OutPoint)
×
UNCOV
448

×
UNCOV
449
                spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
×
UNCOV
450
                        &daemonEvent.OutPoint, daemonEvent.PkScript,
×
UNCOV
451
                        daemonEvent.HeightHint,
×
UNCOV
452
                )
×
UNCOV
453
                if err != nil {
×
454
                        return fmt.Errorf("unable to register spend: %w", err)
×
455
                }
×
456

UNCOV
457
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
UNCOV
458
                        for {
×
UNCOV
459
                                select {
×
460
                                case spend, ok := <-spendEvent.Spend:
×
461
                                        if !ok {
×
462
                                                return
×
463
                                        }
×
464

465
                                        // If there's a post-send event, then
466
                                        // we'll send that into the current
467
                                        // state now.
468
                                        postSpend := daemonEvent.PostSpendEvent
×
469
                                        postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll
×
470
                                                customEvent := f(spend)
×
471
                                                s.SendEvent(ctx, customEvent)
×
472
                                        })
×
473

474
                                        return
×
475

UNCOV
476
                                case <-ctx.Done():
×
UNCOV
477
                                        return
×
478
                                }
479
                        }
480
                })
481

UNCOV
482
                if !launched {
×
483
                        return ErrStateMachineShutdown
×
484
                }
×
485

UNCOV
486
                return nil
×
487

488
        // The state machine has requested a new event to be sent once a
489
        // specified txid+pkScript pair has confirmed.
490
        case *RegisterConf[Event]:
×
491
                s.log.DebugS(ctx, "Registering conf",
×
492
                        "txid", daemonEvent.Txid)
×
493

×
494
                numConfs := daemonEvent.NumConfs.UnwrapOr(1)
×
495
                confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
×
496
                        &daemonEvent.Txid, daemonEvent.PkScript,
×
497
                        numConfs, daemonEvent.HeightHint,
×
498
                )
×
499
                if err != nil {
×
500
                        return fmt.Errorf("unable to register conf: %w", err)
×
501
                }
×
502

503
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
504
                        for {
×
505
                                select {
×
506
                                case <-confEvent.Confirmed:
×
507
                                        // If there's a post-conf event, then
×
508
                                        // we'll send that into the current
×
509
                                        // state now.
×
510
                                        //
×
511
                                        // TODO(roasbeef): refactor to
×
512
                                        // dispatchAfterRecv w/ above
×
513
                                        postConf := daemonEvent.PostConfEvent
×
514
                                        postConf.WhenSome(func(e Event) {
×
515
                                                s.SendEvent(ctx, e)
×
516
                                        })
×
517

518
                                        return
×
519

520
                                case <-ctx.Done():
×
521
                                        return
×
522
                                }
523
                        }
524
                })
525

526
                if !launched {
×
527
                        return ErrStateMachineShutdown
×
528
                }
×
529

530
                return nil
×
531
        }
532

533
        return fmt.Errorf("unknown daemon event: %T", event)
×
534
}
535

536
// applyEvents applies a new event to the state machine. This will continue
537
// until no further events are emitted by the state machine. Along the way,
538
// we'll also ensure to execute any daemon events that are emitted.
539
func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
540
        currentState State[Event, Env], newEvent Event) (State[Event, Env],
UNCOV
541
        error) {
×
UNCOV
542

×
UNCOV
543
        s.log.DebugS(ctx, "Applying new event",
×
UNCOV
544
                "event", lnutils.SpewLogClosure(newEvent))
×
UNCOV
545

×
UNCOV
546
        eventQueue := fn.NewQueue(newEvent)
×
UNCOV
547

×
UNCOV
548
        // Given the next event to handle, we'll process the event, then add
×
UNCOV
549
        // any new emitted internal events to our event queue. This continues
×
UNCOV
550
        // until we reach a terminal state, or we run out of internal events to
×
UNCOV
551
        // process.
×
UNCOV
552
        //
×
UNCOV
553
        //nolint:ll
×
UNCOV
554
        for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
×
UNCOV
555
                err := fn.MapOptionZ(nextEvent, func(event Event) error {
×
UNCOV
556
                        s.log.DebugS(ctx, "Processing event",
×
UNCOV
557
                                "event", lnutils.SpewLogClosure(event))
×
UNCOV
558

×
UNCOV
559
                        // Apply the state transition function of the current
×
UNCOV
560
                        // state given this new event and our existing env.
×
UNCOV
561
                        transition, err := currentState.ProcessEvent(
×
UNCOV
562
                                event, s.cfg.Env,
×
UNCOV
563
                        )
×
UNCOV
564
                        if err != nil {
×
UNCOV
565
                                return err
×
UNCOV
566
                        }
×
567

UNCOV
568
                        newEvents := transition.NewEvents
×
UNCOV
569
                        err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { //nolint:ll
×
UNCOV
570
                                // With the event processed, we'll process any
×
UNCOV
571
                                // new daemon events that were emitted as part
×
UNCOV
572
                                // of this new state transition.
×
UNCOV
573
                                for _, dEvent := range events.ExternalEvents {
×
UNCOV
574
                                        err := s.executeDaemonEvent(
×
UNCOV
575
                                                ctx, dEvent,
×
UNCOV
576
                                        )
×
UNCOV
577
                                        if err != nil {
×
578
                                                return err
×
579
                                        }
×
580
                                }
581

582
                                // Next, we'll add any new emitted events to our
583
                                // event queue.
584
                                //
585
                                //nolint:ll
UNCOV
586
                                for _, inEvent := range events.InternalEvent {
×
UNCOV
587
                                        s.log.DebugS(ctx, "Adding new internal event to queue",
×
UNCOV
588
                                                "event", lnutils.SpewLogClosure(inEvent))
×
UNCOV
589

×
UNCOV
590
                                        eventQueue.Enqueue(inEvent)
×
UNCOV
591
                                }
×
592

UNCOV
593
                                return nil
×
594
                        })
UNCOV
595
                        if err != nil {
×
596
                                return err
×
597
                        }
×
598

UNCOV
599
                        s.log.InfoS(ctx, "State transition",
×
UNCOV
600
                                btclog.Fmt("from_state", "%T", currentState),
×
UNCOV
601
                                btclog.Fmt("to_state", "%T", transition.NextState))
×
UNCOV
602

×
UNCOV
603
                        // With our events processed, we'll now update our
×
UNCOV
604
                        // internal state.
×
UNCOV
605
                        currentState = transition.NextState
×
UNCOV
606

×
UNCOV
607
                        // Notify our subscribers of the new state transition.
×
UNCOV
608
                        //
×
UNCOV
609
                        // TODO(roasbeef): will only give us the outer state?
×
UNCOV
610
                        //  * let FSMs choose which state to emit?
×
UNCOV
611
                        s.newStateEvents.NotifySubscribers(currentState)
×
UNCOV
612

×
UNCOV
613
                        return nil
×
614
                })
UNCOV
615
                if err != nil {
×
UNCOV
616
                        return currentState, err
×
UNCOV
617
                }
×
618
        }
619

UNCOV
620
        return currentState, nil
×
621
}
622

623
// driveMachine is the main event loop of the state machine. It accepts any new
624
// incoming events, and then drives the state machine forward until it reaches
625
// a terminal state.
UNCOV
626
func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
×
UNCOV
627
        s.log.DebugS(ctx, "Starting state machine")
×
UNCOV
628

×
UNCOV
629
        currentState := s.cfg.InitialState
×
UNCOV
630

×
UNCOV
631
        // Before we start, if we have an init daemon event specified, then
×
UNCOV
632
        // we'll handle that now.
×
UNCOV
633
        err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error {
×
UNCOV
634
                return s.executeDaemonEvent(ctx, event)
×
UNCOV
635
        })
×
UNCOV
636
        if err != nil {
×
637
                s.log.ErrorS(ctx, "Unable to execute init event", err)
×
638
                return
×
639
        }
×
640

641
        // We just started driving the state machine, so we'll notify our
642
        // subscribers of this starting state.
UNCOV
643
        s.newStateEvents.NotifySubscribers(currentState)
×
UNCOV
644

×
UNCOV
645
        for {
×
UNCOV
646
                select {
×
647
                // We have a new external event, so we'll drive the state
648
                // machine forward until we either run out of internal events,
649
                // or we reach a terminal state.
UNCOV
650
                case newEvent := <-s.events:
×
UNCOV
651
                        newState, err := s.applyEvents(
×
UNCOV
652
                                ctx, currentState, newEvent,
×
UNCOV
653
                        )
×
UNCOV
654
                        if err != nil {
×
UNCOV
655
                                s.cfg.ErrorReporter.ReportError(err)
×
UNCOV
656

×
UNCOV
657
                                s.log.ErrorS(ctx, "Unable to apply event", err)
×
UNCOV
658

×
UNCOV
659
                                // An error occurred, so we'll tear down the
×
UNCOV
660
                                // entire state machine as we can't proceed.
×
UNCOV
661
                                go s.Stop()
×
UNCOV
662

×
UNCOV
663
                                return
×
UNCOV
664
                        }
×
665

UNCOV
666
                        currentState = newState
×
667

668
                // An outside caller is querying our state, so we'll return the
669
                // latest state.
UNCOV
670
                case stateQuery := <-s.stateQuery:
×
UNCOV
671
                        if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { //nolint:ll
×
672
                                return
×
673
                        }
×
674

UNCOV
675
                case <-s.gm.Done():
×
UNCOV
676
                        return
×
677
                }
678
        }
679
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc