• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12115818427

02 Dec 2024 08:53AM UTC coverage: 58.778% (-0.2%) from 58.948%
12115818427

push

github

web-flow
Merge pull request #9314 from ellemouton/slog1

build+lnd+docs: start using slog and add commit_hash to log lines

123 of 234 new or added lines in 38 files covered. (52.56%)

425 existing lines in 27 files now uncovered.

133081 of 226412 relevant lines covered (58.78%)

19439.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.41
/protofsm/state_machine.go
1
package protofsm
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/chainntnfs"
13
        "github.com/lightningnetwork/lnd/fn"
14
        "github.com/lightningnetwork/lnd/lnutils"
15
        "github.com/lightningnetwork/lnd/lnwire"
16
)
17

18
const (
19
        // pollInterval is the interval at which we'll poll the SendWhen
20
        // predicate if specified.
21
        pollInterval = time.Millisecond * 100
22
)
23

24
// EmittedEvent is a special type that can be emitted by a state transition.
25
// This can container internal events which are to be routed back to the state,
26
// or external events which are to be sent to the daemon.
27
type EmittedEvent[Event any] struct {
28
        // InternalEvent is an optional internal event that is to be routed
29
        // back to the target state. This enables state to trigger one or many
30
        // state transitions without a new external event.
31
        InternalEvent fn.Option[[]Event]
32

33
        // ExternalEvent is an optional external event that is to be sent to
34
        // the daemon for dispatch. Usually, this is some form of I/O.
35
        ExternalEvents fn.Option[DaemonEventSet]
36
}
37

38
// StateTransition is a state transition type. It denotes the next state to go
39
// to, and also the set of events to emit.
40
type StateTransition[Event any, Env Environment] struct {
41
        // NextState is the next state to transition to.
42
        NextState State[Event, Env]
43

44
        // NewEvents is the set of events to emit.
45
        NewEvents fn.Option[EmittedEvent[Event]]
46
}
47

48
// Environment is an abstract interface that represents the environment that
49
// the state machine will execute using. From the PoV of the main state machine
50
// executor, we just care about being able to clean up any resources that were
51
// allocated by the environment.
52
type Environment interface {
53
        // Name returns the name of the environment. This is used to uniquely
54
        // identify the environment of related state machines.
55
        Name() string
56
}
57

58
// State defines an abstract state along, namely its state transition function
59
// that takes as input an event and an environment, and returns a state
60
// transition (next state, and set of events to emit). As state can also either
61
// be terminal, or not, a terminal event causes state execution to halt.
62
type State[Event any, Env Environment] interface {
63
        // ProcessEvent takes an event and an environment, and returns a new
64
        // state transition. This will be iteratively called until either a
65
        // terminal state is reached, or no further internal events are
66
        // emitted.
67
        ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
68

69
        // IsTerminal returns true if this state is terminal, and false
70
        // otherwise.
71
        IsTerminal() bool
72

73
        // TODO(roasbeef): also add state serialization?
74
}
75

76
// DaemonAdapters is a set of methods that server as adapters to bridge the
77
// pure world of the FSM to the real world of the daemon. These will be used to
78
// do things like broadcast transactions, or send messages to peers.
79
type DaemonAdapters interface {
80
        // SendMessages sends the target set of messages to the target peer.
81
        SendMessages(btcec.PublicKey, []lnwire.Message) error
82

83
        // BroadcastTransaction broadcasts a transaction with the target label.
84
        BroadcastTransaction(*wire.MsgTx, string) error
85

86
        // RegisterConfirmationsNtfn registers an intent to be notified once
87
        // txid reaches numConfs confirmations. We also pass in the pkScript as
88
        // the default light client instead needs to match on scripts created
89
        // in the block. If a nil txid is passed in, then not only should we
90
        // match on the script, but we should also dispatch once the
91
        // transaction containing the script reaches numConfs confirmations.
92
        // This can be useful in instances where we only know the script in
93
        // advance, but not the transaction containing it.
94
        //
95
        // TODO(roasbeef): could abstract further?
96
        RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte,
97
                numConfs, heightHint uint32,
98
                opts ...chainntnfs.NotifierOption,
99
        ) (*chainntnfs.ConfirmationEvent, error)
100

101
        // RegisterSpendNtfn registers an intent to be notified once the target
102
        // outpoint is successfully spent within a transaction. The script that
103
        // the outpoint creates must also be specified. This allows this
104
        // interface to be implemented by BIP 158-like filtering.
105
        RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte,
106
                heightHint uint32) (*chainntnfs.SpendEvent, error)
107
}
108

109
// stateQuery is used by outside callers to query the internal state of the
110
// state machine.
111
type stateQuery[Event any, Env Environment] struct {
112
        // CurrentState is a channel that will be sent the current state of the
113
        // state machine.
114
        CurrentState chan State[Event, Env]
115
}
116

117
// StateMachine represents an abstract FSM that is able to process new incoming
118
// events and drive a state machine to termination. This implementation uses
119
// type params to abstract over the types of events and environment. Events
120
// trigger new state transitions, that use the environment to perform some
121
// action.
122
//
123
// TODO(roasbeef): terminal check, daemon event execution, init?
124
type StateMachine[Event any, Env Environment] struct {
125
        cfg StateMachineCfg[Event, Env]
126

127
        // events is the channel that will be used to send new events to the
128
        // FSM.
129
        events chan Event
130

131
        // newStateEvents is an EventDistributor that will be used to notify
132
        // any relevant callers of new state transitions that occur.
133
        newStateEvents *fn.EventDistributor[State[Event, Env]]
134

135
        // stateQuery is a channel that will be used by outside callers to
136
        // query the internal state machine state.
137
        stateQuery chan stateQuery[Event, Env]
138

139
        wg   fn.GoroutineManager
140
        quit chan struct{}
141

142
        startOnce sync.Once
143
        stopOnce  sync.Once
144
}
145

146
// ErrorReporter is an interface that's used to report errors that occur during
147
// state machine execution.
148
type ErrorReporter interface {
149
        // ReportError is a method that's used to report an error that occurred
150
        // during state machine execution.
151
        ReportError(err error)
152
}
153

154
// StateMachineCfg is a configuration struct that's used to create a new state
155
// machine.
156
type StateMachineCfg[Event any, Env Environment] struct {
157
        // ErrorReporter is used to report errors that occur during state
158
        // transitions.
159
        ErrorReporter ErrorReporter
160

161
        // Daemon is a set of adapters that will be used to bridge the FSM to
162
        // the daemon.
163
        Daemon DaemonAdapters
164

165
        // InitialState is the initial state of the state machine.
166
        InitialState State[Event, Env]
167

168
        // Env is the environment that the state machine will use to execute.
169
        Env Env
170

171
        // InitEvent is an optional event that will be sent to the state
172
        // machine as if it was emitted at the onset of the state machine. This
173
        // can be used to set up tracking state such as a txid confirmation
174
        // event.
175
        InitEvent fn.Option[DaemonEvent]
176

177
        // MsgMapper is an optional message mapper that can be used to map
178
        // normal wire messages into FSM events.
179
        MsgMapper fn.Option[MsgMapper[Event]]
180

181
        // CustomPollInterval is an optional custom poll interval that can be
182
        // used to set a quicker interval for tests.
183
        CustomPollInterval fn.Option[time.Duration]
184
}
185

186
// NewStateMachine creates a new state machine given a set of daemon adapters,
187
// an initial state, an environment, and an event to process as if emitted at
188
// the onset of the state machine. Such an event can be used to set up tracking
189
// state such as a txid confirmation event.
190
func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], //nolint:ll
191
) StateMachine[Event, Env] {
4✔
192

4✔
193
        return StateMachine[Event, Env]{
4✔
194
                cfg:            cfg,
4✔
195
                events:         make(chan Event, 1),
4✔
196
                stateQuery:     make(chan stateQuery[Event, Env]),
4✔
197
                wg:             *fn.NewGoroutineManager(context.Background()),
4✔
198
                newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
4✔
199
                quit:           make(chan struct{}),
4✔
200
        }
4✔
201
}
4✔
202

203
// Start starts the state machine. This will spawn a goroutine that will drive
204
// the state machine to completion.
205
func (s *StateMachine[Event, Env]) Start() {
4✔
206
        s.startOnce.Do(func() {
8✔
207
                _ = s.wg.Go(func(ctx context.Context) {
8✔
208
                        s.driveMachine()
4✔
209
                })
4✔
210
        })
211
}
212

213
// Stop stops the state machine. This will block until the state machine has
214
// reached a stopping point.
215
func (s *StateMachine[Event, Env]) Stop() {
4✔
216
        s.stopOnce.Do(func() {
8✔
217
                close(s.quit)
4✔
218
                s.wg.Stop()
4✔
219
        })
4✔
220
}
221

222
// SendEvent sends a new event to the state machine.
223
//
224
// TODO(roasbeef): bool if processed?
225
func (s *StateMachine[Event, Env]) SendEvent(event Event) {
5✔
226
        log.Debugf("FSM(%v): sending event: %v", s.cfg.Env.Name(),
5✔
227
                lnutils.SpewLogClosure(event),
5✔
228
        )
5✔
229

5✔
230
        select {
5✔
231
        case s.events <- event:
5✔
232
        case <-s.quit:
×
233
                return
×
234
        }
235
}
236

237
// CanHandle returns true if the target message can be routed to the state
238
// machine.
239
func (s *StateMachine[Event, Env]) CanHandle(msg lnwire.Message) bool {
2✔
240
        cfgMapper := s.cfg.MsgMapper
2✔
241
        return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool {
4✔
242
                return mapper.MapMsg(msg).IsSome()
2✔
243
        })
2✔
244
}
245

246
// Name returns the name of the state machine's environment.
247
func (s *StateMachine[Event, Env]) Name() string {
×
248
        return s.cfg.Env.Name()
×
249
}
×
250

251
// SendMessage attempts to send a wire message to the state machine. If the
252
// message can be mapped using the default message mapper, then true is
253
// returned indicating that the message was processed. Otherwise, false is
254
// returned.
255
func (s *StateMachine[Event, Env]) SendMessage(msg lnwire.Message) bool {
1✔
256
        // If we have no message mapper, then return false as we can't process
1✔
257
        // this message.
1✔
258
        if !s.cfg.MsgMapper.IsSome() {
1✔
259
                return false
×
260
        }
×
261

262
        log.Debugf("FSM(%v): sending msg: %v", s.cfg.Env.Name(),
1✔
263
                lnutils.SpewLogClosure(msg),
1✔
264
        )
1✔
265

1✔
266
        // Otherwise, try to map the message using the default message mapper.
1✔
267
        // If we can't extract an event, then we'll return false to indicate
1✔
268
        // that the message wasn't processed.
1✔
269
        var processed bool
1✔
270
        s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) {
2✔
271
                event := mapper.MapMsg(msg)
1✔
272

1✔
273
                event.WhenSome(func(event Event) {
2✔
274
                        s.SendEvent(event)
1✔
275

1✔
276
                        processed = true
1✔
277
                })
1✔
278
        })
279

280
        return processed
1✔
281
}
282

283
// CurrentState returns the current state of the state machine.
284
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
1✔
285
        query := stateQuery[Event, Env]{
1✔
286
                CurrentState: make(chan State[Event, Env], 1),
1✔
287
        }
1✔
288

1✔
289
        if !fn.SendOrQuit(s.stateQuery, query, s.quit) {
1✔
290
                return nil, fmt.Errorf("state machine is shutting down")
×
291
        }
×
292

293
        return fn.RecvOrTimeout(query.CurrentState, time.Second)
1✔
294
}
295

296
// StateSubscriber represents an active subscription to be notified of new
297
// state transitions.
298
type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
299

300
// RegisterStateEvents registers a new event listener that will be notified of
301
// new state transitions.
302
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[
303
        Event, Env] {
4✔
304

4✔
305
        subscriber := fn.NewEventReceiver[State[Event, Env]](10)
4✔
306

4✔
307
        // TODO(roasbeef): instead give the state and the input event?
4✔
308

4✔
309
        s.newStateEvents.RegisterSubscriber(subscriber)
4✔
310

4✔
311
        return subscriber
4✔
312
}
4✔
313

314
// RemoveStateSub removes the target state subscriber from the set of active
315
// subscribers.
316
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
317
        Event, Env]) {
4✔
318

4✔
319
        _ = s.newStateEvents.RemoveSubscriber(sub)
4✔
320
}
4✔
321

322
// executeDaemonEvent executes a daemon event, which is a special type of event
323
// that can be emitted as part of the state transition function of the state
324
// machine. An error is returned if the type of event is unknown.
325
func (s *StateMachine[Event, Env]) executeDaemonEvent(
326
        event DaemonEvent) error {
4✔
327

4✔
328
        switch daemonEvent := event.(type) {
4✔
329
        // This is a send message event, so we'll send the event, and also mind
330
        // any preconditions as well as post-send events.
331
        case *SendMsgEvent[Event]:
3✔
332
                sendAndCleanUp := func() error {
6✔
333
                        log.Debugf("FSM(%v): sending message to target(%x): "+
3✔
334
                                "%v", s.cfg.Env.Name(),
3✔
335
                                daemonEvent.TargetPeer.SerializeCompressed(),
3✔
336
                                lnutils.SpewLogClosure(daemonEvent.Msgs),
3✔
337
                        )
3✔
338

3✔
339
                        err := s.cfg.Daemon.SendMessages(
3✔
340
                                daemonEvent.TargetPeer, daemonEvent.Msgs,
3✔
341
                        )
3✔
342
                        if err != nil {
3✔
343
                                return fmt.Errorf("unable to send msgs: %w",
×
344
                                        err)
×
345
                        }
×
346

347
                        // If a post-send event was specified, then we'll funnel
348
                        // that back into the main state machine now as well.
349
                        return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
5✔
350
                                return s.wg.Go(func(ctx context.Context) {
4✔
351
                                        log.Debugf("FSM(%v): sending "+
2✔
352
                                                "post-send event: %v",
2✔
353
                                                s.cfg.Env.Name(),
2✔
354
                                                lnutils.SpewLogClosure(event),
2✔
355
                                        )
2✔
356

2✔
357
                                        s.SendEvent(event)
2✔
358
                                })
2✔
359
                        })
360
                }
361

362
                // If this doesn't have a SendWhen predicate, then we can just
363
                // send it off right away.
364
                if !daemonEvent.SendWhen.IsSome() {
5✔
365
                        return sendAndCleanUp()
2✔
366
                }
2✔
367

368
                // Otherwise, this has a SendWhen predicate, so we'll need
369
                // launch a goroutine to poll the SendWhen, then send only once
370
                // the predicate is true.
371
                return s.wg.Go(func(ctx context.Context) {
2✔
372
                        predicateTicker := time.NewTicker(
1✔
373
                                s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
1✔
374
                        )
1✔
375
                        defer predicateTicker.Stop()
1✔
376

1✔
377
                        log.Infof("FSM(%v): waiting for send predicate to "+
1✔
378
                                "be true", s.cfg.Env.Name())
1✔
379

1✔
380
                        for {
2✔
381
                                select {
1✔
382
                                case <-predicateTicker.C:
1✔
383
                                        canSend := fn.MapOptionZ(
1✔
384
                                                daemonEvent.SendWhen,
1✔
385
                                                func(pred SendPredicate) bool {
2✔
386
                                                        return pred()
1✔
387
                                                },
1✔
388
                                        )
389

390
                                        if canSend {
2✔
391
                                                log.Infof("FSM(%v): send "+
1✔
392
                                                        "active predicate",
1✔
393
                                                        s.cfg.Env.Name())
1✔
394

1✔
395
                                                err := sendAndCleanUp()
1✔
396
                                                if err != nil {
1✔
NEW
397
                                                        //nolint:ll
×
398
                                                        log.Errorf("FSM(%v): unable to send message: %v", err)
×
399
                                                }
×
400

401
                                                return
1✔
402
                                        }
403

404
                                case <-ctx.Done():
×
405
                                        return
×
406
                                }
407
                        }
408
                })
409

410
        // If this is a broadcast transaction event, then we'll broadcast with
411
        // the label attached.
412
        case *BroadcastTxn:
1✔
413
                log.Debugf("FSM(%v): broadcasting txn, txid=%v",
1✔
414
                        s.cfg.Env.Name(), daemonEvent.Tx.TxHash())
1✔
415

1✔
416
                err := s.cfg.Daemon.BroadcastTransaction(
1✔
417
                        daemonEvent.Tx, daemonEvent.Label,
1✔
418
                )
1✔
419
                if err != nil {
1✔
420
                        return fmt.Errorf("unable to broadcast txn: %w", err)
×
421
                }
×
422

423
                return nil
1✔
424

425
        // The state machine has requested a new event to be sent once a
426
        // transaction spending a specified outpoint has confirmed.
427
        case *RegisterSpend[Event]:
×
428
                log.Debugf("FSM(%v): registering spend: %v", s.cfg.Env.Name(),
×
429
                        daemonEvent.OutPoint)
×
430

×
431
                spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
×
432
                        &daemonEvent.OutPoint, daemonEvent.PkScript,
×
433
                        daemonEvent.HeightHint,
×
434
                )
×
435
                if err != nil {
×
436
                        return fmt.Errorf("unable to register spend: %w", err)
×
437
                }
×
438

439
                return s.wg.Go(func(ctx context.Context) {
×
440
                        for {
×
441
                                select {
×
442
                                case spend, ok := <-spendEvent.Spend:
×
443
                                        if !ok {
×
444
                                                return
×
445
                                        }
×
446

447
                                        // If there's a post-send event, then
448
                                        // we'll send that into the current
449
                                        // state now.
450
                                        postSpend := daemonEvent.PostSpendEvent
×
NEW
451
                                        postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll
×
452
                                                customEvent := f(spend)
×
453
                                                s.SendEvent(customEvent)
×
454
                                        })
×
455

456
                                        return
×
457

458
                                case <-ctx.Done():
×
459
                                        return
×
460
                                }
461
                        }
462
                })
463

464
        // The state machine has requested a new event to be sent once a
465
        // specified txid+pkScript pair has confirmed.
466
        case *RegisterConf[Event]:
×
467
                log.Debugf("FSM(%v): registering conf: %v", s.cfg.Env.Name(),
×
468
                        daemonEvent.Txid)
×
469

×
470
                numConfs := daemonEvent.NumConfs.UnwrapOr(1)
×
471
                confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
×
472
                        &daemonEvent.Txid, daemonEvent.PkScript,
×
473
                        numConfs, daemonEvent.HeightHint,
×
474
                )
×
475
                if err != nil {
×
476
                        return fmt.Errorf("unable to register conf: %w", err)
×
477
                }
×
478

479
                return s.wg.Go(func(ctx context.Context) {
×
480
                        for {
×
481
                                select {
×
482
                                case <-confEvent.Confirmed:
×
483
                                        // If there's a post-conf event, then
×
484
                                        // we'll send that into the current
×
485
                                        // state now.
×
486
                                        //
×
487
                                        // TODO(roasbeef): refactor to
×
488
                                        // dispatchAfterRecv w/ above
×
489
                                        postConf := daemonEvent.PostConfEvent
×
490
                                        postConf.WhenSome(func(e Event) {
×
491
                                                s.SendEvent(e)
×
492
                                        })
×
493

494
                                        return
×
495

496
                                case <-ctx.Done():
×
497
                                        return
×
498
                                }
499
                        }
500
                })
501
        }
502

503
        return fmt.Errorf("unknown daemon event: %T", event)
×
504
}
505

506
// applyEvents applies a new event to the state machine. This will continue
507
// until no further events are emitted by the state machine. Along the way,
508
// we'll also ensure to execute any daemon events that are emitted.
509
func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env],
510
        newEvent Event) (State[Event, Env], error) {
5✔
511

5✔
512
        log.Debugf("FSM(%v): applying new event", s.cfg.Env.Name(),
5✔
513
                lnutils.SpewLogClosure(newEvent),
5✔
514
        )
5✔
515
        eventQueue := fn.NewQueue(newEvent)
5✔
516

5✔
517
        // Given the next event to handle, we'll process the event, then add
5✔
518
        // any new emitted internal events to our event queue. This continues
5✔
519
        // until we reach a terminal state, or we run out of internal events to
5✔
520
        // process.
5✔
521
        //
5✔
522
        //nolint:ll
5✔
523
        for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
11✔
524
                err := fn.MapOptionZ(nextEvent, func(event Event) error {
12✔
525
                        log.Debugf("FSM(%v): processing event: %v",
6✔
526
                                s.cfg.Env.Name(),
6✔
527
                                lnutils.SpewLogClosure(event),
6✔
528
                        )
6✔
529

6✔
530
                        // Apply the state transition function of the current
6✔
531
                        // state given this new event and our existing env.
6✔
532
                        transition, err := currentState.ProcessEvent(
6✔
533
                                event, s.cfg.Env,
6✔
534
                        )
6✔
535
                        if err != nil {
6✔
536
                                return err
×
537
                        }
×
538

539
                        newEvents := transition.NewEvents
6✔
540
                        err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { //nolint:ll
8✔
541
                                // With the event processed, we'll process any
2✔
542
                                // new daemon events that were emitted as part
2✔
543
                                // of this new state transition.
2✔
544
                                //
2✔
545
                                //nolint:ll
2✔
546
                                err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error {
3✔
547
                                        log.Debugf("FSM(%v): processing "+
1✔
548
                                                "daemon %v daemon events",
1✔
549
                                                s.cfg.Env.Name(), len(dEvents))
1✔
550

1✔
551
                                        for _, dEvent := range dEvents {
4✔
552
                                                err := s.executeDaemonEvent(
3✔
553
                                                        dEvent,
3✔
554
                                                )
3✔
555
                                                if err != nil {
3✔
556
                                                        return err
×
557
                                                }
×
558
                                        }
559

560
                                        return nil
1✔
561
                                })
562
                                if err != nil {
2✔
563
                                        return err
×
564
                                }
×
565

566
                                // Next, we'll add any new emitted events to
567
                                // our event queue.
568
                                //
569
                                //nolint:ll
570
                                events.InternalEvent.WhenSome(func(es []Event) {
3✔
571
                                        for _, inEvent := range es {
2✔
572
                                                log.Debugf("FSM(%v): adding "+
1✔
573
                                                        "new internal event "+
1✔
574
                                                        "to queue: %v",
1✔
575
                                                        s.cfg.Env.Name(),
1✔
576
                                                        lnutils.SpewLogClosure(
1✔
577
                                                                inEvent,
1✔
578
                                                        ),
1✔
579
                                                )
1✔
580

1✔
581
                                                eventQueue.Enqueue(inEvent)
1✔
582
                                        }
1✔
583
                                })
584

585
                                return nil
2✔
586
                        })
587
                        if err != nil {
6✔
588
                                return err
×
589
                        }
×
590

591
                        log.Infof("FSM(%v): state transition: from_state=%T, "+
6✔
592
                                "to_state=%T",
6✔
593
                                s.cfg.Env.Name(), currentState,
6✔
594
                                transition.NextState)
6✔
595

6✔
596
                        // With our events processed, we'll now update our
6✔
597
                        // internal state.
6✔
598
                        currentState = transition.NextState
6✔
599

6✔
600
                        // Notify our subscribers of the new state transition.
6✔
601
                        //
6✔
602
                        // TODO(roasbeef): will only give us the outer state?
6✔
603
                        //  * let FSMs choose which state to emit?
6✔
604
                        s.newStateEvents.NotifySubscribers(currentState)
6✔
605

6✔
606
                        return nil
6✔
607
                })
608
                if err != nil {
6✔
609
                        return currentState, err
×
610
                }
×
611
        }
612

613
        return currentState, nil
5✔
614
}
615

616
// driveMachine is the main event loop of the state machine. It accepts any new
617
// incoming events, and then drives the state machine forward until it reaches
618
// a terminal state.
619
func (s *StateMachine[Event, Env]) driveMachine() {
4✔
620
        log.Debugf("FSM(%v): starting state machine", s.cfg.Env.Name())
4✔
621

4✔
622
        currentState := s.cfg.InitialState
4✔
623

4✔
624
        // Before we start, if we have an init daemon event specified, then
4✔
625
        // we'll handle that now.
4✔
626
        err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error {
5✔
627
                return s.executeDaemonEvent(event)
1✔
628
        })
1✔
629
        if err != nil {
4✔
630
                log.Errorf("unable to execute init event: %w", err)
×
631
                return
×
632
        }
×
633

634
        // We just started driving the state machine, so we'll notify our
635
        // subscribers of this starting state.
636
        s.newStateEvents.NotifySubscribers(currentState)
4✔
637

4✔
638
        for {
14✔
639
                select {
10✔
640
                // We have a new external event, so we'll drive the state
641
                // machine forward until we either run out of internal events,
642
                // or we reach a terminal state.
643
                case newEvent := <-s.events:
5✔
644
                        newState, err := s.applyEvents(currentState, newEvent)
5✔
645
                        if err != nil {
5✔
646
                                s.cfg.ErrorReporter.ReportError(err)
×
647

×
648
                                log.Errorf("unable to apply event: %v", err)
×
649

×
650
                                // An error occurred, so we'll tear down the
×
651
                                // entire state machine as we can't proceed.
×
652
                                go s.Stop()
×
653

×
654
                                return
×
655
                        }
×
656

657
                        currentState = newState
5✔
658

659
                // An outside caller is querying our state, so we'll return the
660
                // latest state.
661
                case stateQuery := <-s.stateQuery:
1✔
662
                        if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { //nolint:ll
1✔
663
                                return
×
664
                        }
×
665

666
                case <-s.wg.Done():
4✔
667
                        return
4✔
668
                }
669
        }
670
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc