• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13035292482

29 Jan 2025 03:59PM UTC coverage: 49.3% (-9.5%) from 58.777%
13035292482

Pull #9456

github

mohamedawnallah
docs: update release-notes-0.19.0.md

In this commit, we warn users about the removal
of RPCs `SendToRoute`, `SendToRouteSync`, `SendPayment`,
and `SendPaymentSync` in the next release 0.20.
Pull Request #9456: lnrpc+docs: deprecate warning `SendToRoute`, `SendToRouteSync`, `SendPayment`, and `SendPaymentSync` in Release 0.19

100634 of 204126 relevant lines covered (49.3%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/protofsm/state_machine.go
1
package protofsm
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/lightningnetwork/lnd/chainntnfs"
14
        "github.com/lightningnetwork/lnd/fn/v2"
15
        "github.com/lightningnetwork/lnd/lnutils"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
const (
20
        // pollInterval is the interval at which we'll poll the SendWhen
21
        // predicate if specified.
22
        pollInterval = time.Millisecond * 100
23
)
24

25
var (
26
        // ErrStateMachineShutdown occurs when trying to feed an event to a
27
        // StateMachine that has been asked to Stop.
28
        ErrStateMachineShutdown = fmt.Errorf("StateMachine is shutting down")
29
)
30

31
// EmittedEvent is a special type that can be emitted by a state transition.
32
// This can container internal events which are to be routed back to the state,
33
// or external events which are to be sent to the daemon.
34
type EmittedEvent[Event any] struct {
35
        // InternalEvent is an optional internal event that is to be routed
36
        // back to the target state. This enables state to trigger one or many
37
        // state transitions without a new external event.
38
        InternalEvent []Event
39

40
        // ExternalEvent is an optional external event that is to be sent to
41
        // the daemon for dispatch. Usually, this is some form of I/O.
42
        ExternalEvents DaemonEventSet
43
}
44

45
// StateTransition is a state transition type. It denotes the next state to go
46
// to, and also the set of events to emit.
47
type StateTransition[Event any, Env Environment] struct {
48
        // NextState is the next state to transition to.
49
        NextState State[Event, Env]
50

51
        // NewEvents is the set of events to emit.
52
        NewEvents fn.Option[EmittedEvent[Event]]
53
}
54

55
// Environment is an abstract interface that represents the environment that
56
// the state machine will execute using. From the PoV of the main state machine
57
// executor, we just care about being able to clean up any resources that were
58
// allocated by the environment.
59
type Environment interface {
60
        // Name returns the name of the environment. This is used to uniquely
61
        // identify the environment of related state machines.
62
        Name() string
63
}
64

65
// State defines an abstract state along, namely its state transition function
66
// that takes as input an event and an environment, and returns a state
67
// transition (next state, and set of events to emit). As state can also either
68
// be terminal, or not, a terminal event causes state execution to halt.
69
type State[Event any, Env Environment] interface {
70
        // ProcessEvent takes an event and an environment, and returns a new
71
        // state transition. This will be iteratively called until either a
72
        // terminal state is reached, or no further internal events are
73
        // emitted.
74
        ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
75

76
        // IsTerminal returns true if this state is terminal, and false
77
        // otherwise.
78
        IsTerminal() bool
79

80
        // TODO(roasbeef): also add state serialization?
81
}
82

83
// DaemonAdapters is a set of methods that server as adapters to bridge the
84
// pure world of the FSM to the real world of the daemon. These will be used to
85
// do things like broadcast transactions, or send messages to peers.
86
type DaemonAdapters interface {
87
        // SendMessages sends the target set of messages to the target peer.
88
        SendMessages(btcec.PublicKey, []lnwire.Message) error
89

90
        // BroadcastTransaction broadcasts a transaction with the target label.
91
        BroadcastTransaction(*wire.MsgTx, string) error
92

93
        // RegisterConfirmationsNtfn registers an intent to be notified once
94
        // txid reaches numConfs confirmations. We also pass in the pkScript as
95
        // the default light client instead needs to match on scripts created
96
        // in the block. If a nil txid is passed in, then not only should we
97
        // match on the script, but we should also dispatch once the
98
        // transaction containing the script reaches numConfs confirmations.
99
        // This can be useful in instances where we only know the script in
100
        // advance, but not the transaction containing it.
101
        //
102
        // TODO(roasbeef): could abstract further?
103
        RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte,
104
                numConfs, heightHint uint32,
105
                opts ...chainntnfs.NotifierOption,
106
        ) (*chainntnfs.ConfirmationEvent, error)
107

108
        // RegisterSpendNtfn registers an intent to be notified once the target
109
        // outpoint is successfully spent within a transaction. The script that
110
        // the outpoint creates must also be specified. This allows this
111
        // interface to be implemented by BIP 158-like filtering.
112
        RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte,
113
                heightHint uint32) (*chainntnfs.SpendEvent, error)
114
}
115

116
// stateQuery is used by outside callers to query the internal state of the
117
// state machine.
118
type stateQuery[Event any, Env Environment] struct {
119
        // CurrentState is a channel that will be sent the current state of the
120
        // state machine.
121
        CurrentState chan State[Event, Env]
122
}
123

124
// StateMachine represents an abstract FSM that is able to process new incoming
125
// events and drive a state machine to termination. This implementation uses
126
// type params to abstract over the types of events and environment. Events
127
// trigger new state transitions, that use the environment to perform some
128
// action.
129
//
130
// TODO(roasbeef): terminal check, daemon event execution, init?
131
type StateMachine[Event any, Env Environment] struct {
132
        cfg StateMachineCfg[Event, Env]
133

134
        log btclog.Logger
135

136
        // events is the channel that will be used to send new events to the
137
        // FSM.
138
        events chan Event
139

140
        // newStateEvents is an EventDistributor that will be used to notify
141
        // any relevant callers of new state transitions that occur.
142
        newStateEvents *fn.EventDistributor[State[Event, Env]]
143

144
        // stateQuery is a channel that will be used by outside callers to
145
        // query the internal state machine state.
146
        stateQuery chan stateQuery[Event, Env]
147

148
        gm   fn.GoroutineManager
149
        quit chan struct{}
150

151
        startOnce sync.Once
152
        stopOnce  sync.Once
153
}
154

155
// ErrorReporter is an interface that's used to report errors that occur during
156
// state machine execution.
157
type ErrorReporter interface {
158
        // ReportError is a method that's used to report an error that occurred
159
        // during state machine execution.
160
        ReportError(err error)
161
}
162

163
// StateMachineCfg is a configuration struct that's used to create a new state
164
// machine.
165
type StateMachineCfg[Event any, Env Environment] struct {
166
        // ErrorReporter is used to report errors that occur during state
167
        // transitions.
168
        ErrorReporter ErrorReporter
169

170
        // Daemon is a set of adapters that will be used to bridge the FSM to
171
        // the daemon.
172
        Daemon DaemonAdapters
173

174
        // InitialState is the initial state of the state machine.
175
        InitialState State[Event, Env]
176

177
        // Env is the environment that the state machine will use to execute.
178
        Env Env
179

180
        // InitEvent is an optional event that will be sent to the state
181
        // machine as if it was emitted at the onset of the state machine. This
182
        // can be used to set up tracking state such as a txid confirmation
183
        // event.
184
        InitEvent fn.Option[DaemonEvent]
185

186
        // MsgMapper is an optional message mapper that can be used to map
187
        // normal wire messages into FSM events.
188
        MsgMapper fn.Option[MsgMapper[Event]]
189

190
        // CustomPollInterval is an optional custom poll interval that can be
191
        // used to set a quicker interval for tests.
192
        CustomPollInterval fn.Option[time.Duration]
193
}
194

195
// NewStateMachine creates a new state machine given a set of daemon adapters,
196
// an initial state, an environment, and an event to process as if emitted at
197
// the onset of the state machine. Such an event can be used to set up tracking
198
// state such as a txid confirmation event.
199
func NewStateMachine[Event any, Env Environment](
200
        cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] {
×
201

×
202
        return StateMachine[Event, Env]{
×
203
                cfg: cfg,
×
204
                log: log.WithPrefix(
×
205
                        fmt.Sprintf("FSM(%v):", cfg.Env.Name()),
×
206
                ),
×
207
                events:         make(chan Event, 1),
×
208
                stateQuery:     make(chan stateQuery[Event, Env]),
×
209
                gm:             *fn.NewGoroutineManager(),
×
210
                newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
×
211
                quit:           make(chan struct{}),
×
212
        }
×
213
}
×
214

215
// Start starts the state machine. This will spawn a goroutine that will drive
216
// the state machine to completion.
217
func (s *StateMachine[Event, Env]) Start(ctx context.Context) {
×
218
        s.startOnce.Do(func() {
×
219
                _ = s.gm.Go(ctx, func(ctx context.Context) {
×
220
                        s.driveMachine(ctx)
×
221
                })
×
222
        })
223
}
224

225
// Stop stops the state machine. This will block until the state machine has
226
// reached a stopping point.
227
func (s *StateMachine[Event, Env]) Stop() {
×
228
        s.stopOnce.Do(func() {
×
229
                close(s.quit)
×
230
                s.gm.Stop()
×
231
        })
×
232
}
233

234
// SendEvent sends a new event to the state machine.
235
//
236
// TODO(roasbeef): bool if processed?
237
func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
×
238
        s.log.DebugS(ctx, "Sending event",
×
239
                "event", lnutils.SpewLogClosure(event))
×
240

×
241
        select {
×
242
        case s.events <- event:
×
243
        case <-ctx.Done():
×
244
                return
×
245
        case <-s.quit:
×
246
                return
×
247
        }
248
}
249

250
// CanHandle returns true if the target message can be routed to the state
251
// machine.
252
func (s *StateMachine[Event, Env]) CanHandle(msg lnwire.Message) bool {
×
253
        cfgMapper := s.cfg.MsgMapper
×
254
        return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool {
×
255
                return mapper.MapMsg(msg).IsSome()
×
256
        })
×
257
}
258

259
// Name returns the name of the state machine's environment.
260
func (s *StateMachine[Event, Env]) Name() string {
×
261
        return s.cfg.Env.Name()
×
262
}
×
263

264
// SendMessage attempts to send a wire message to the state machine. If the
265
// message can be mapped using the default message mapper, then true is
266
// returned indicating that the message was processed. Otherwise, false is
267
// returned.
268
func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context,
269
        msg lnwire.Message) bool {
×
270

×
271
        // If we have no message mapper, then return false as we can't process
×
272
        // this message.
×
273
        if !s.cfg.MsgMapper.IsSome() {
×
274
                return false
×
275
        }
×
276

277
        s.log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg))
×
278

×
279
        // Otherwise, try to map the message using the default message mapper.
×
280
        // If we can't extract an event, then we'll return false to indicate
×
281
        // that the message wasn't processed.
×
282
        var processed bool
×
283
        s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) {
×
284
                event := mapper.MapMsg(msg)
×
285

×
286
                event.WhenSome(func(event Event) {
×
287
                        s.SendEvent(ctx, event)
×
288

×
289
                        processed = true
×
290
                })
×
291
        })
292

293
        return processed
×
294
}
295

296
// CurrentState returns the current state of the state machine.
297
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
×
298
        query := stateQuery[Event, Env]{
×
299
                CurrentState: make(chan State[Event, Env], 1),
×
300
        }
×
301

×
302
        if !fn.SendOrQuit(s.stateQuery, query, s.quit) {
×
303
                return nil, ErrStateMachineShutdown
×
304
        }
×
305

306
        return fn.RecvOrTimeout(query.CurrentState, time.Second)
×
307
}
308

309
// StateSubscriber represents an active subscription to be notified of new
310
// state transitions.
311
type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
312

313
// RegisterStateEvents registers a new event listener that will be notified of
314
// new state transitions.
315
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[
316
        Event, Env] {
×
317

×
318
        subscriber := fn.NewEventReceiver[State[Event, Env]](10)
×
319

×
320
        // TODO(roasbeef): instead give the state and the input event?
×
321

×
322
        s.newStateEvents.RegisterSubscriber(subscriber)
×
323

×
324
        return subscriber
×
325
}
×
326

327
// RemoveStateSub removes the target state subscriber from the set of active
328
// subscribers.
329
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
330
        Event, Env]) {
×
331

×
332
        _ = s.newStateEvents.RemoveSubscriber(sub)
×
333
}
×
334

335
// executeDaemonEvent executes a daemon event, which is a special type of event
336
// that can be emitted as part of the state transition function of the state
337
// machine. An error is returned if the type of event is unknown.
338
func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
339
        event DaemonEvent) error {
×
340

×
341
        switch daemonEvent := event.(type) {
×
342
        // This is a send message event, so we'll send the event, and also mind
343
        // any preconditions as well as post-send events.
344
        case *SendMsgEvent[Event]:
×
345
                sendAndCleanUp := func() error {
×
346
                        s.log.DebugS(ctx, "Sending message to target",
×
347
                                btclog.Hex6("target", daemonEvent.TargetPeer.SerializeCompressed()),
×
348
                                "messages", lnutils.SpewLogClosure(daemonEvent.Msgs))
×
349

×
350
                        err := s.cfg.Daemon.SendMessages(
×
351
                                daemonEvent.TargetPeer, daemonEvent.Msgs,
×
352
                        )
×
353
                        if err != nil {
×
354
                                return fmt.Errorf("unable to send msgs: %w",
×
355
                                        err)
×
356
                        }
×
357

358
                        // If a post-send event was specified, then we'll funnel
359
                        // that back into the main state machine now as well.
360
                        return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
×
361
                                launched := s.gm.Go(
×
362
                                        ctx, func(ctx context.Context) {
×
363
                                                s.log.DebugS(ctx, "Sending post-send event",
×
364
                                                        "event", lnutils.SpewLogClosure(event))
×
365

×
366
                                                s.SendEvent(ctx, event)
×
367
                                        },
×
368
                                )
369

370
                                if !launched {
×
371
                                        return ErrStateMachineShutdown
×
372
                                }
×
373

374
                                return nil
×
375
                        })
376
                }
377

378
                // If this doesn't have a SendWhen predicate, then we can just
379
                // send it off right away.
380
                if !daemonEvent.SendWhen.IsSome() {
×
381
                        return sendAndCleanUp()
×
382
                }
×
383

384
                // Otherwise, this has a SendWhen predicate, so we'll need
385
                // launch a goroutine to poll the SendWhen, then send only once
386
                // the predicate is true.
387
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
388
                        predicateTicker := time.NewTicker(
×
389
                                s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
×
390
                        )
×
391
                        defer predicateTicker.Stop()
×
392

×
393
                        s.log.InfoS(ctx, "Waiting for send predicate to be true")
×
394

×
395
                        for {
×
396
                                select {
×
397
                                case <-predicateTicker.C:
×
398
                                        canSend := fn.MapOptionZ(
×
399
                                                daemonEvent.SendWhen,
×
400
                                                func(pred SendPredicate) bool {
×
401
                                                        return pred()
×
402
                                                },
×
403
                                        )
404

405
                                        if canSend {
×
406
                                                s.log.InfoS(ctx, "Send active predicate")
×
407

×
408
                                                err := sendAndCleanUp()
×
409
                                                if err != nil {
×
410
                                                        s.log.ErrorS(ctx, "Unable to send message", err)
×
411
                                                }
×
412

413
                                                return
×
414
                                        }
415

416
                                case <-ctx.Done():
×
417
                                        return
×
418
                                }
419
                        }
420
                })
421

422
                if !launched {
×
423
                        return ErrStateMachineShutdown
×
424
                }
×
425

426
                return nil
×
427

428
        // If this is a broadcast transaction event, then we'll broadcast with
429
        // the label attached.
430
        case *BroadcastTxn:
×
431
                s.log.DebugS(ctx, "Broadcasting txn",
×
432
                        "txid", daemonEvent.Tx.TxHash())
×
433

×
434
                err := s.cfg.Daemon.BroadcastTransaction(
×
435
                        daemonEvent.Tx, daemonEvent.Label,
×
436
                )
×
437
                if err != nil {
×
438
                        return fmt.Errorf("unable to broadcast txn: %w", err)
×
439
                }
×
440

441
                return nil
×
442

443
        // The state machine has requested a new event to be sent once a
444
        // transaction spending a specified outpoint has confirmed.
445
        case *RegisterSpend[Event]:
×
446
                s.log.DebugS(ctx, "Registering spend",
×
447
                        "outpoint", daemonEvent.OutPoint)
×
448

×
449
                spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
×
450
                        &daemonEvent.OutPoint, daemonEvent.PkScript,
×
451
                        daemonEvent.HeightHint,
×
452
                )
×
453
                if err != nil {
×
454
                        return fmt.Errorf("unable to register spend: %w", err)
×
455
                }
×
456

457
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
458
                        for {
×
459
                                select {
×
460
                                case spend, ok := <-spendEvent.Spend:
×
461
                                        if !ok {
×
462
                                                return
×
463
                                        }
×
464

465
                                        // If there's a post-send event, then
466
                                        // we'll send that into the current
467
                                        // state now.
468
                                        postSpend := daemonEvent.PostSpendEvent
×
469
                                        postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll
×
470
                                                customEvent := f(spend)
×
471
                                                s.SendEvent(ctx, customEvent)
×
472
                                        })
×
473

474
                                        return
×
475

476
                                case <-ctx.Done():
×
477
                                        return
×
478
                                }
479
                        }
480
                })
481

482
                if !launched {
×
483
                        return ErrStateMachineShutdown
×
484
                }
×
485

486
                return nil
×
487

488
        // The state machine has requested a new event to be sent once a
489
        // specified txid+pkScript pair has confirmed.
490
        case *RegisterConf[Event]:
×
491
                s.log.DebugS(ctx, "Registering conf",
×
492
                        "txid", daemonEvent.Txid)
×
493

×
494
                numConfs := daemonEvent.NumConfs.UnwrapOr(1)
×
495
                confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
×
496
                        &daemonEvent.Txid, daemonEvent.PkScript,
×
497
                        numConfs, daemonEvent.HeightHint,
×
498
                )
×
499
                if err != nil {
×
500
                        return fmt.Errorf("unable to register conf: %w", err)
×
501
                }
×
502

503
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
504
                        for {
×
505
                                select {
×
506
                                case <-confEvent.Confirmed:
×
507
                                        // If there's a post-conf event, then
×
508
                                        // we'll send that into the current
×
509
                                        // state now.
×
510
                                        //
×
511
                                        // TODO(roasbeef): refactor to
×
512
                                        // dispatchAfterRecv w/ above
×
513
                                        postConf := daemonEvent.PostConfEvent
×
514
                                        postConf.WhenSome(func(e Event) {
×
515
                                                s.SendEvent(ctx, e)
×
516
                                        })
×
517

518
                                        return
×
519

520
                                case <-ctx.Done():
×
521
                                        return
×
522
                                }
523
                        }
524
                })
525

526
                if !launched {
×
527
                        return ErrStateMachineShutdown
×
528
                }
×
529

530
                return nil
×
531
        }
532

533
        return fmt.Errorf("unknown daemon event: %T", event)
×
534
}
535

536
// applyEvents applies a new event to the state machine. This will continue
537
// until no further events are emitted by the state machine. Along the way,
538
// we'll also ensure to execute any daemon events that are emitted.
539
func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
540
        currentState State[Event, Env], newEvent Event) (State[Event, Env],
541
        error) {
×
542

×
543
        s.log.DebugS(ctx, "Applying new event",
×
544
                "event", lnutils.SpewLogClosure(newEvent))
×
545

×
546
        eventQueue := fn.NewQueue(newEvent)
×
547

×
548
        // Given the next event to handle, we'll process the event, then add
×
549
        // any new emitted internal events to our event queue. This continues
×
550
        // until we reach a terminal state, or we run out of internal events to
×
551
        // process.
×
552
        //
×
553
        //nolint:ll
×
554
        for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
×
555
                err := fn.MapOptionZ(nextEvent, func(event Event) error {
×
556
                        s.log.DebugS(ctx, "Processing event",
×
557
                                "event", lnutils.SpewLogClosure(event))
×
558

×
559
                        // Apply the state transition function of the current
×
560
                        // state given this new event and our existing env.
×
561
                        transition, err := currentState.ProcessEvent(
×
562
                                event, s.cfg.Env,
×
563
                        )
×
564
                        if err != nil {
×
565
                                return err
×
566
                        }
×
567

568
                        newEvents := transition.NewEvents
×
569
                        err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { //nolint:ll
×
570
                                // With the event processed, we'll process any
×
571
                                // new daemon events that were emitted as part
×
572
                                // of this new state transition.
×
573
                                for _, dEvent := range events.ExternalEvents {
×
574
                                        err := s.executeDaemonEvent(
×
575
                                                ctx, dEvent,
×
576
                                        )
×
577
                                        if err != nil {
×
578
                                                return err
×
579
                                        }
×
580
                                }
581

582
                                // Next, we'll add any new emitted events to our
583
                                // event queue.
584
                                //
585
                                //nolint:ll
586
                                for _, inEvent := range events.InternalEvent {
×
587
                                        s.log.DebugS(ctx, "Adding new internal event to queue",
×
588
                                                "event", lnutils.SpewLogClosure(inEvent))
×
589

×
590
                                        eventQueue.Enqueue(inEvent)
×
591
                                }
×
592

593
                                return nil
×
594
                        })
595
                        if err != nil {
×
596
                                return err
×
597
                        }
×
598

599
                        s.log.InfoS(ctx, "State transition",
×
600
                                btclog.Fmt("from_state", "%T", currentState),
×
601
                                btclog.Fmt("to_state", "%T", transition.NextState))
×
602

×
603
                        // With our events processed, we'll now update our
×
604
                        // internal state.
×
605
                        currentState = transition.NextState
×
606

×
607
                        // Notify our subscribers of the new state transition.
×
608
                        //
×
609
                        // TODO(roasbeef): will only give us the outer state?
×
610
                        //  * let FSMs choose which state to emit?
×
611
                        s.newStateEvents.NotifySubscribers(currentState)
×
612

×
613
                        return nil
×
614
                })
615
                if err != nil {
×
616
                        return currentState, err
×
617
                }
×
618
        }
619

620
        return currentState, nil
×
621
}
622

623
// driveMachine is the main event loop of the state machine. It accepts any new
624
// incoming events, and then drives the state machine forward until it reaches
625
// a terminal state.
626
func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
×
627
        s.log.DebugS(ctx, "Starting state machine")
×
628

×
629
        currentState := s.cfg.InitialState
×
630

×
631
        // Before we start, if we have an init daemon event specified, then
×
632
        // we'll handle that now.
×
633
        err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error {
×
634
                return s.executeDaemonEvent(ctx, event)
×
635
        })
×
636
        if err != nil {
×
637
                s.log.ErrorS(ctx, "Unable to execute init event", err)
×
638
                return
×
639
        }
×
640

641
        // We just started driving the state machine, so we'll notify our
642
        // subscribers of this starting state.
643
        s.newStateEvents.NotifySubscribers(currentState)
×
644

×
645
        for {
×
646
                select {
×
647
                // We have a new external event, so we'll drive the state
648
                // machine forward until we either run out of internal events,
649
                // or we reach a terminal state.
650
                case newEvent := <-s.events:
×
651
                        newState, err := s.applyEvents(
×
652
                                ctx, currentState, newEvent,
×
653
                        )
×
654
                        if err != nil {
×
655
                                s.cfg.ErrorReporter.ReportError(err)
×
656

×
657
                                s.log.ErrorS(ctx, "Unable to apply event", err)
×
658

×
659
                                // An error occurred, so we'll tear down the
×
660
                                // entire state machine as we can't proceed.
×
661
                                go s.Stop()
×
662

×
663
                                return
×
664
                        }
×
665

666
                        currentState = newState
×
667

668
                // An outside caller is querying our state, so we'll return the
669
                // latest state.
670
                case stateQuery := <-s.stateQuery:
×
671
                        if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { //nolint:ll
×
672
                                return
×
673
                        }
×
674

675
                case <-s.gm.Done():
×
676
                        return
×
677
                }
678
        }
679
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc