• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12691974548

09 Jan 2025 02:29PM UTC coverage: 58.72% (+0.1%) from 58.598%
12691974548

Pull #9406

github

Crypt-iQ
go.mod+htlcswitch+protofsm: update fn package to v2.0.7
Pull Request #9406: go.mod+htlcswitch+protofsm: update fn package to v2.0.7

26 of 34 new or added lines in 3 files covered. (76.47%)

72 existing lines in 20 files now uncovered.

135373 of 230538 relevant lines covered (58.72%)

19131.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.23
/protofsm/state_machine.go
1
package protofsm
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/chainntnfs"
13
        "github.com/lightningnetwork/lnd/fn/v2"
14
        "github.com/lightningnetwork/lnd/lnutils"
15
        "github.com/lightningnetwork/lnd/lnwire"
16
)
17

18
const (
19
        // pollInterval is the interval at which we'll poll the SendWhen
20
        // predicate if specified.
21
        pollInterval = time.Millisecond * 100
22
)
23

24
var (
25
        // ErrStateMachineShutdown occurs when trying to feed an event to a
26
        // StateMachine that has been asked to Stop.
27
        ErrStateMachineShutdown = fmt.Errorf("StateMachine is shutting down")
28
)
29

30
// EmittedEvent is a special type that can be emitted by a state transition.
31
// This can container internal events which are to be routed back to the state,
32
// or external events which are to be sent to the daemon.
33
type EmittedEvent[Event any] struct {
34
        // InternalEvent is an optional internal event that is to be routed
35
        // back to the target state. This enables state to trigger one or many
36
        // state transitions without a new external event.
37
        InternalEvent []Event
38

39
        // ExternalEvent is an optional external event that is to be sent to
40
        // the daemon for dispatch. Usually, this is some form of I/O.
41
        ExternalEvents DaemonEventSet
42
}
43

44
// StateTransition is a state transition type. It denotes the next state to go
45
// to, and also the set of events to emit.
46
type StateTransition[Event any, Env Environment] struct {
47
        // NextState is the next state to transition to.
48
        NextState State[Event, Env]
49

50
        // NewEvents is the set of events to emit.
51
        NewEvents fn.Option[EmittedEvent[Event]]
52
}
53

54
// Environment is an abstract interface that represents the environment that
55
// the state machine will execute using. From the PoV of the main state machine
56
// executor, we just care about being able to clean up any resources that were
57
// allocated by the environment.
58
type Environment interface {
59
        // Name returns the name of the environment. This is used to uniquely
60
        // identify the environment of related state machines.
61
        Name() string
62
}
63

64
// State defines an abstract state along, namely its state transition function
65
// that takes as input an event and an environment, and returns a state
66
// transition (next state, and set of events to emit). As state can also either
67
// be terminal, or not, a terminal event causes state execution to halt.
68
type State[Event any, Env Environment] interface {
69
        // ProcessEvent takes an event and an environment, and returns a new
70
        // state transition. This will be iteratively called until either a
71
        // terminal state is reached, or no further internal events are
72
        // emitted.
73
        ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
74

75
        // IsTerminal returns true if this state is terminal, and false
76
        // otherwise.
77
        IsTerminal() bool
78

79
        // TODO(roasbeef): also add state serialization?
80
}
81

82
// DaemonAdapters is a set of methods that server as adapters to bridge the
83
// pure world of the FSM to the real world of the daemon. These will be used to
84
// do things like broadcast transactions, or send messages to peers.
85
type DaemonAdapters interface {
86
        // SendMessages sends the target set of messages to the target peer.
87
        SendMessages(btcec.PublicKey, []lnwire.Message) error
88

89
        // BroadcastTransaction broadcasts a transaction with the target label.
90
        BroadcastTransaction(*wire.MsgTx, string) error
91

92
        // RegisterConfirmationsNtfn registers an intent to be notified once
93
        // txid reaches numConfs confirmations. We also pass in the pkScript as
94
        // the default light client instead needs to match on scripts created
95
        // in the block. If a nil txid is passed in, then not only should we
96
        // match on the script, but we should also dispatch once the
97
        // transaction containing the script reaches numConfs confirmations.
98
        // This can be useful in instances where we only know the script in
99
        // advance, but not the transaction containing it.
100
        //
101
        // TODO(roasbeef): could abstract further?
102
        RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte,
103
                numConfs, heightHint uint32,
104
                opts ...chainntnfs.NotifierOption,
105
        ) (*chainntnfs.ConfirmationEvent, error)
106

107
        // RegisterSpendNtfn registers an intent to be notified once the target
108
        // outpoint is successfully spent within a transaction. The script that
109
        // the outpoint creates must also be specified. This allows this
110
        // interface to be implemented by BIP 158-like filtering.
111
        RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte,
112
                heightHint uint32) (*chainntnfs.SpendEvent, error)
113
}
114

115
// stateQuery is used by outside callers to query the internal state of the
116
// state machine.
117
type stateQuery[Event any, Env Environment] struct {
118
        // CurrentState is a channel that will be sent the current state of the
119
        // state machine.
120
        CurrentState chan State[Event, Env]
121
}
122

123
// StateMachine represents an abstract FSM that is able to process new incoming
124
// events and drive a state machine to termination. This implementation uses
125
// type params to abstract over the types of events and environment. Events
126
// trigger new state transitions, that use the environment to perform some
127
// action.
128
//
129
// TODO(roasbeef): terminal check, daemon event execution, init?
130
type StateMachine[Event any, Env Environment] struct {
131
        cfg StateMachineCfg[Event, Env]
132

133
        // events is the channel that will be used to send new events to the
134
        // FSM.
135
        events chan Event
136

137
        // newStateEvents is an EventDistributor that will be used to notify
138
        // any relevant callers of new state transitions that occur.
139
        newStateEvents *fn.EventDistributor[State[Event, Env]]
140

141
        // stateQuery is a channel that will be used by outside callers to
142
        // query the internal state machine state.
143
        stateQuery chan stateQuery[Event, Env]
144

145
        wg   fn.GoroutineManager
146
        quit chan struct{}
147

148
        startOnce sync.Once
149
        stopOnce  sync.Once
150
}
151

152
// ErrorReporter is an interface that's used to report errors that occur during
153
// state machine execution.
154
type ErrorReporter interface {
155
        // ReportError is a method that's used to report an error that occurred
156
        // during state machine execution.
157
        ReportError(err error)
158
}
159

160
// StateMachineCfg is a configuration struct that's used to create a new state
161
// machine.
162
type StateMachineCfg[Event any, Env Environment] struct {
163
        // ErrorReporter is used to report errors that occur during state
164
        // transitions.
165
        ErrorReporter ErrorReporter
166

167
        // Daemon is a set of adapters that will be used to bridge the FSM to
168
        // the daemon.
169
        Daemon DaemonAdapters
170

171
        // InitialState is the initial state of the state machine.
172
        InitialState State[Event, Env]
173

174
        // Env is the environment that the state machine will use to execute.
175
        Env Env
176

177
        // InitEvent is an optional event that will be sent to the state
178
        // machine as if it was emitted at the onset of the state machine. This
179
        // can be used to set up tracking state such as a txid confirmation
180
        // event.
181
        InitEvent fn.Option[DaemonEvent]
182

183
        // MsgMapper is an optional message mapper that can be used to map
184
        // normal wire messages into FSM events.
185
        MsgMapper fn.Option[MsgMapper[Event]]
186

187
        // CustomPollInterval is an optional custom poll interval that can be
188
        // used to set a quicker interval for tests.
189
        CustomPollInterval fn.Option[time.Duration]
190
}
191

192
// NewStateMachine creates a new state machine given a set of daemon adapters,
193
// an initial state, an environment, and an event to process as if emitted at
194
// the onset of the state machine. Such an event can be used to set up tracking
195
// state such as a txid confirmation event.
196
func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], //nolint:ll
197
) StateMachine[Event, Env] {
28✔
198
        return StateMachine[Event, Env]{
28✔
199
                cfg:            cfg,
28✔
200
                events:         make(chan Event, 1),
28✔
201
                stateQuery:     make(chan stateQuery[Event, Env]),
28✔
202
                wg:             *fn.NewGoroutineManager(),
28✔
203
                newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
28✔
204
                quit:           make(chan struct{}),
28✔
205
        }
28✔
206
}
28✔
207

208
// Start starts the state machine. This will spawn a goroutine that will drive
209
// the state machine to completion.
210
func (s *StateMachine[Event, Env]) Start() {
28✔
211
        s.startOnce.Do(func() {
56✔
212
                _ = s.wg.Go(context.Background(), func(ctx context.Context) {
56✔
213
                        s.driveMachine()
28✔
214
                })
28✔
215
        })
216
}
217

218
// Stop stops the state machine. This will block until the state machine has
219
// reached a stopping point.
220
func (s *StateMachine[Event, Env]) Stop() {
37✔
221
        s.stopOnce.Do(func() {
65✔
222
                close(s.quit)
28✔
223
                s.wg.Stop()
28✔
224
        })
28✔
225
}
226

227
// SendEvent sends a new event to the state machine.
228
//
229
// TODO(roasbeef): bool if processed?
230
func (s *StateMachine[Event, Env]) SendEvent(event Event) {
38✔
231
        log.Debugf("FSM(%v): sending event: %v", s.cfg.Env.Name(),
38✔
232
                lnutils.SpewLogClosure(event),
38✔
233
        )
38✔
234

38✔
235
        select {
38✔
236
        case s.events <- event:
38✔
237
        case <-s.quit:
×
238
                return
×
239
        }
240
}
241

242
// CanHandle returns true if the target message can be routed to the state
243
// machine.
244
func (s *StateMachine[Event, Env]) CanHandle(msg lnwire.Message) bool {
2✔
245
        cfgMapper := s.cfg.MsgMapper
2✔
246
        return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool {
4✔
247
                return mapper.MapMsg(msg).IsSome()
2✔
248
        })
2✔
249
}
250

251
// Name returns the name of the state machine's environment.
252
func (s *StateMachine[Event, Env]) Name() string {
×
253
        return s.cfg.Env.Name()
×
254
}
×
255

256
// SendMessage attempts to send a wire message to the state machine. If the
257
// message can be mapped using the default message mapper, then true is
258
// returned indicating that the message was processed. Otherwise, false is
259
// returned.
260
func (s *StateMachine[Event, Env]) SendMessage(msg lnwire.Message) bool {
1✔
261
        // If we have no message mapper, then return false as we can't process
1✔
262
        // this message.
1✔
263
        if !s.cfg.MsgMapper.IsSome() {
1✔
264
                return false
×
265
        }
×
266

267
        log.Debugf("FSM(%v): sending msg: %v", s.cfg.Env.Name(),
1✔
268
                lnutils.SpewLogClosure(msg),
1✔
269
        )
1✔
270

1✔
271
        // Otherwise, try to map the message using the default message mapper.
1✔
272
        // If we can't extract an event, then we'll return false to indicate
1✔
273
        // that the message wasn't processed.
1✔
274
        var processed bool
1✔
275
        s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) {
2✔
276
                event := mapper.MapMsg(msg)
1✔
277

1✔
278
                event.WhenSome(func(event Event) {
2✔
279
                        s.SendEvent(event)
1✔
280

1✔
281
                        processed = true
1✔
282
                })
1✔
283
        })
284

285
        return processed
1✔
286
}
287

288
// CurrentState returns the current state of the state machine.
289
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
17✔
290
        query := stateQuery[Event, Env]{
17✔
291
                CurrentState: make(chan State[Event, Env], 1),
17✔
292
        }
17✔
293

17✔
294
        if !fn.SendOrQuit(s.stateQuery, query, s.quit) {
17✔
295
                return nil, ErrStateMachineShutdown
×
296
        }
×
297

298
        return fn.RecvOrTimeout(query.CurrentState, time.Second)
17✔
299
}
300

301
// StateSubscriber represents an active subscription to be notified of new
302
// state transitions.
303
type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
304

305
// RegisterStateEvents registers a new event listener that will be notified of
306
// new state transitions.
307
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[
308
        Event, Env] {
28✔
309

28✔
310
        subscriber := fn.NewEventReceiver[State[Event, Env]](10)
28✔
311

28✔
312
        // TODO(roasbeef): instead give the state and the input event?
28✔
313

28✔
314
        s.newStateEvents.RegisterSubscriber(subscriber)
28✔
315

28✔
316
        return subscriber
28✔
317
}
28✔
318

319
// RemoveStateSub removes the target state subscriber from the set of active
320
// subscribers.
321
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
322
        Event, Env]) {
28✔
323

28✔
324
        _ = s.newStateEvents.RemoveSubscriber(sub)
28✔
325
}
28✔
326

327
// executeDaemonEvent executes a daemon event, which is a special type of event
328
// that can be emitted as part of the state transition function of the state
329
// machine. An error is returned if the type of event is unknown.
330
//
331
//nolint:funlen
332
func (s *StateMachine[Event, Env]) executeDaemonEvent(
333
        event DaemonEvent) error {
47✔
334

47✔
335
        switch daemonEvent := event.(type) {
47✔
336
        // This is a send message event, so we'll send the event, and also mind
337
        // any preconditions as well as post-send events.
338
        case *SendMsgEvent[Event]:
17✔
339
                sendAndCleanUp := func() error {
33✔
340
                        log.Debugf("FSM(%v): sending message to target(%x): "+
16✔
341
                                "%v", s.cfg.Env.Name(),
16✔
342
                                daemonEvent.TargetPeer.SerializeCompressed(),
16✔
343
                                lnutils.SpewLogClosure(daemonEvent.Msgs),
16✔
344
                        )
16✔
345

16✔
346
                        err := s.cfg.Daemon.SendMessages(
16✔
347
                                daemonEvent.TargetPeer, daemonEvent.Msgs,
16✔
348
                        )
16✔
349
                        if err != nil {
16✔
350
                                return fmt.Errorf("unable to send msgs: %w",
×
351
                                        err)
×
352
                        }
×
353

354
                        // If a post-send event was specified, then we'll funnel
355
                        // that back into the main state machine now as well.
356
                        return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
19✔
357
                                ctxClosure := func(ctx context.Context) {
6✔
358
                                        log.Debugf("FSM(%v): sending "+
3✔
359
                                                "post-send event: %v",
3✔
360
                                                s.cfg.Env.Name(),
3✔
361
                                                lnutils.SpewLogClosure(event),
3✔
362
                                        )
3✔
363

3✔
364
                                        s.SendEvent(event)
3✔
365
                                }
3✔
366

367
                                launched := s.wg.Go(
3✔
368
                                        context.Background(), ctxClosure,
3✔
369
                                )
3✔
370
                                if !launched {
3✔
371
                                        return ErrStateMachineShutdown
×
372
                                }
×
373

374
                                return nil
3✔
375
                        })
376
                }
377

378
                // If this doesn't have a SendWhen predicate, then we can just
379
                // send it off right away.
380
                if !daemonEvent.SendWhen.IsSome() {
29✔
381
                        return sendAndCleanUp()
12✔
382
                }
12✔
383

384
                // Otherwise, this has a SendWhen predicate, so we'll need
385
                // launch a goroutine to poll the SendWhen, then send only once
386
                // the predicate is true.
387
                bgCtx := context.Background()
5✔
388
                launched := s.wg.Go(bgCtx, func(ctx context.Context) {
10✔
389
                        predicateTicker := time.NewTicker(
5✔
390
                                s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
5✔
391
                        )
5✔
392
                        defer predicateTicker.Stop()
5✔
393

5✔
394
                        log.Infof("FSM(%v): waiting for send predicate to "+
5✔
395
                                "be true", s.cfg.Env.Name())
5✔
396

5✔
397
                        for {
11✔
398
                                select {
6✔
399
                                case <-predicateTicker.C:
5✔
400
                                        canSend := fn.MapOptionZ(
5✔
401
                                                daemonEvent.SendWhen,
5✔
402
                                                func(pred SendPredicate) bool {
10✔
403
                                                        return pred()
5✔
404
                                                },
5✔
405
                                        )
406

407
                                        if canSend {
9✔
408
                                                log.Infof("FSM(%v): send "+
4✔
409
                                                        "active predicate",
4✔
410
                                                        s.cfg.Env.Name())
4✔
411

4✔
412
                                                err := sendAndCleanUp()
4✔
413
                                                if err != nil {
4✔
414
                                                        //nolint:ll
×
415
                                                        log.Errorf("FSM(%v): unable to send message: %v", err)
×
416
                                                }
×
417

418
                                                return
4✔
419
                                        }
420

421
                                case <-ctx.Done():
1✔
422
                                        return
1✔
423
                                }
424
                        }
425
                })
426

427
                if !launched {
5✔
428
                        return ErrStateMachineShutdown
×
429
                }
×
430

431
                return nil
5✔
432

433
        // If this is a broadcast transaction event, then we'll broadcast with
434
        // the label attached.
435
        case *BroadcastTxn:
6✔
436
                log.Debugf("FSM(%v): broadcasting txn, txid=%v",
6✔
437
                        s.cfg.Env.Name(), daemonEvent.Tx.TxHash())
6✔
438

6✔
439
                err := s.cfg.Daemon.BroadcastTransaction(
6✔
440
                        daemonEvent.Tx, daemonEvent.Label,
6✔
441
                )
6✔
442
                if err != nil {
6✔
443
                        return fmt.Errorf("unable to broadcast txn: %w", err)
×
444
                }
×
445

446
                return nil
6✔
447

448
        // The state machine has requested a new event to be sent once a
449
        // transaction spending a specified outpoint has confirmed.
450
        case *RegisterSpend[Event]:
24✔
451
                log.Debugf("FSM(%v): registering spend: %v", s.cfg.Env.Name(),
24✔
452
                        daemonEvent.OutPoint)
24✔
453

24✔
454
                spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
24✔
455
                        &daemonEvent.OutPoint, daemonEvent.PkScript,
24✔
456
                        daemonEvent.HeightHint,
24✔
457
                )
24✔
458
                if err != nil {
24✔
459
                        return fmt.Errorf("unable to register spend: %w", err)
×
460
                }
×
461

462
                bgCtx := context.Background()
24✔
463
                launched := s.wg.Go(bgCtx, func(ctx context.Context) {
48✔
464
                        for {
48✔
465
                                select {
24✔
466
                                case spend, ok := <-spendEvent.Spend:
×
467
                                        if !ok {
×
468
                                                return
×
469
                                        }
×
470

471
                                        // If there's a post-send event, then
472
                                        // we'll send that into the current
473
                                        // state now.
474
                                        postSpend := daemonEvent.PostSpendEvent
×
475
                                        postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll
×
476
                                                customEvent := f(spend)
×
477
                                                s.SendEvent(customEvent)
×
478
                                        })
×
479

480
                                        return
×
481

482
                                case <-ctx.Done():
24✔
483
                                        return
24✔
484
                                }
485
                        }
486
                })
487

488
                if !launched {
24✔
489
                        return ErrStateMachineShutdown
×
490
                }
×
491

492
                return nil
24✔
493

494
        // The state machine has requested a new event to be sent once a
495
        // specified txid+pkScript pair has confirmed.
496
        case *RegisterConf[Event]:
×
497
                log.Debugf("FSM(%v): registering conf: %v", s.cfg.Env.Name(),
×
498
                        daemonEvent.Txid)
×
499

×
500
                numConfs := daemonEvent.NumConfs.UnwrapOr(1)
×
501
                confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
×
502
                        &daemonEvent.Txid, daemonEvent.PkScript,
×
503
                        numConfs, daemonEvent.HeightHint,
×
504
                )
×
505
                if err != nil {
×
506
                        return fmt.Errorf("unable to register conf: %w", err)
×
507
                }
×
508

NEW
509
                bgCtx := context.Background()
×
NEW
510
                launched := s.wg.Go(bgCtx, func(ctx context.Context) {
×
511
                        for {
×
512
                                select {
×
513
                                case <-confEvent.Confirmed:
×
514
                                        // If there's a post-conf event, then
×
515
                                        // we'll send that into the current
×
516
                                        // state now.
×
517
                                        //
×
518
                                        // TODO(roasbeef): refactor to
×
519
                                        // dispatchAfterRecv w/ above
×
520
                                        postConf := daemonEvent.PostConfEvent
×
521
                                        postConf.WhenSome(func(e Event) {
×
522
                                                s.SendEvent(e)
×
523
                                        })
×
524

525
                                        return
×
526

527
                                case <-ctx.Done():
×
528
                                        return
×
529
                                }
530
                        }
531
                })
532

533
                if !launched {
×
534
                        return ErrStateMachineShutdown
×
535
                }
×
536

537
                return nil
×
538
        }
539

540
        return fmt.Errorf("unknown daemon event: %T", event)
×
541
}
542

543
// applyEvents applies a new event to the state machine. This will continue
544
// until no further events are emitted by the state machine. Along the way,
545
// we'll also ensure to execute any daemon events that are emitted.
546
func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env],
547
        newEvent Event) (State[Event, Env], error) {
38✔
548

38✔
549
        log.Debugf("FSM(%v): applying new event", s.cfg.Env.Name(),
38✔
550
                lnutils.SpewLogClosure(newEvent),
38✔
551
        )
38✔
552
        eventQueue := fn.NewQueue(newEvent)
38✔
553

38✔
554
        // Given the next event to handle, we'll process the event, then add
38✔
555
        // any new emitted internal events to our event queue. This continues
38✔
556
        // until we reach a terminal state, or we run out of internal events to
38✔
557
        // process.
38✔
558
        //
38✔
559
        //nolint:ll
38✔
560
        for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
84✔
561
                err := fn.MapOptionZ(nextEvent, func(event Event) error {
92✔
562
                        log.Debugf("FSM(%v): processing event: %v",
46✔
563
                                s.cfg.Env.Name(),
46✔
564
                                lnutils.SpewLogClosure(event),
46✔
565
                        )
46✔
566

46✔
567
                        // Apply the state transition function of the current
46✔
568
                        // state given this new event and our existing env.
46✔
569
                        transition, err := currentState.ProcessEvent(
46✔
570
                                event, s.cfg.Env,
46✔
571
                        )
46✔
572
                        if err != nil {
55✔
573
                                return err
9✔
574
                        }
9✔
575

576
                        newEvents := transition.NewEvents
37✔
577
                        err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { //nolint:ll
63✔
578
                                // With the event processed, we'll process any
26✔
579
                                // new daemon events that were emitted as part
26✔
580
                                // of this new state transition.
26✔
581
                                for _, dEvent := range events.ExternalEvents {
48✔
582
                                        err := s.executeDaemonEvent(
22✔
583
                                                dEvent,
22✔
584
                                        )
22✔
585
                                        if err != nil {
22✔
586
                                                return err
×
587
                                        }
×
588
                                }
589

590
                                // Next, we'll add any new emitted events to our
591
                                // event queue.
592
                                //
593
                                //nolint:ll
594
                                for _, inEvent := range events.InternalEvent {
34✔
595
                                        log.Debugf("FSM(%v): adding "+
8✔
596
                                                "new internal event "+
8✔
597
                                                "to queue: %v",
8✔
598
                                                s.cfg.Env.Name(),
8✔
599
                                                lnutils.SpewLogClosure(
8✔
600
                                                        inEvent,
8✔
601
                                                ),
8✔
602
                                        )
8✔
603

8✔
604
                                        eventQueue.Enqueue(inEvent)
8✔
605
                                }
8✔
606

607
                                return nil
26✔
608
                        })
609
                        if err != nil {
37✔
610
                                return err
×
611
                        }
×
612

613
                        log.Infof("FSM(%v): state transition: from_state=%T, "+
37✔
614
                                "to_state=%T",
37✔
615
                                s.cfg.Env.Name(), currentState,
37✔
616
                                transition.NextState)
37✔
617

37✔
618
                        // With our events processed, we'll now update our
37✔
619
                        // internal state.
37✔
620
                        currentState = transition.NextState
37✔
621

37✔
622
                        // Notify our subscribers of the new state transition.
37✔
623
                        //
37✔
624
                        // TODO(roasbeef): will only give us the outer state?
37✔
625
                        //  * let FSMs choose which state to emit?
37✔
626
                        s.newStateEvents.NotifySubscribers(currentState)
37✔
627

37✔
628
                        return nil
37✔
629
                })
630
                if err != nil {
55✔
631
                        return currentState, err
9✔
632
                }
9✔
633
        }
634

635
        return currentState, nil
29✔
636
}
637

638
// driveMachine is the main event loop of the state machine. It accepts any new
639
// incoming events, and then drives the state machine forward until it reaches
640
// a terminal state.
641
func (s *StateMachine[Event, Env]) driveMachine() {
28✔
642
        log.Debugf("FSM(%v): starting state machine", s.cfg.Env.Name())
28✔
643

28✔
644
        currentState := s.cfg.InitialState
28✔
645

28✔
646
        // Before we start, if we have an init daemon event specified, then
28✔
647
        // we'll handle that now.
28✔
648
        err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error {
53✔
649
                return s.executeDaemonEvent(event)
25✔
650
        })
25✔
651
        if err != nil {
28✔
652
                log.Errorf("unable to execute init event: %w", err)
×
653
                return
×
654
        }
×
655

656
        // We just started driving the state machine, so we'll notify our
657
        // subscribers of this starting state.
658
        s.newStateEvents.NotifySubscribers(currentState)
28✔
659

28✔
660
        for {
102✔
661
                select {
74✔
662
                // We have a new external event, so we'll drive the state
663
                // machine forward until we either run out of internal events,
664
                // or we reach a terminal state.
665
                case newEvent := <-s.events:
38✔
666
                        newState, err := s.applyEvents(currentState, newEvent)
38✔
667
                        if err != nil {
47✔
668
                                s.cfg.ErrorReporter.ReportError(err)
9✔
669

9✔
670
                                log.Errorf("unable to apply event: %v", err)
9✔
671

9✔
672
                                // An error occurred, so we'll tear down the
9✔
673
                                // entire state machine as we can't proceed.
9✔
674
                                go s.Stop()
9✔
675

9✔
676
                                return
9✔
677
                        }
9✔
678

679
                        currentState = newState
29✔
680

681
                // An outside caller is querying our state, so we'll return the
682
                // latest state.
683
                case stateQuery := <-s.stateQuery:
17✔
684
                        if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { //nolint:ll
17✔
685
                                return
×
686
                        }
×
687

688
                case <-s.wg.Done():
19✔
689
                        return
19✔
690
                }
691
        }
692
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc