• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14847594682

05 May 2025 10:32PM UTC coverage: 58.58%. First build
14847594682

Pull #9726

github

web-flow
Merge b58451faa into 234949c29
Pull Request #9726: protofsm: add option to allow conf resp to return full back

0 of 5 new or added lines in 1 file covered. (0.0%)

97391 of 166254 relevant lines covered (58.58%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.57
/protofsm/state_machine.go
1
package protofsm
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/lightningnetwork/lnd/chainntnfs"
14
        "github.com/lightningnetwork/lnd/fn/v2"
15
        "github.com/lightningnetwork/lnd/lnutils"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
        "github.com/lightningnetwork/lnd/msgmux"
18
)
19

20
const (
21
        // pollInterval is the interval at which we'll poll the SendWhen
22
        // predicate if specified.
23
        pollInterval = time.Millisecond * 100
24
)
25

26
var (
27
        // ErrStateMachineShutdown occurs when trying to feed an event to a
28
        // StateMachine that has been asked to Stop.
29
        ErrStateMachineShutdown = fmt.Errorf("StateMachine is shutting down")
30
)
31

32
// EmittedEvent is a special type that can be emitted by a state transition.
33
// This can container internal events which are to be routed back to the state,
34
// or external events which are to be sent to the daemon.
35
type EmittedEvent[Event any] struct {
36
        // InternalEvent is an optional internal event that is to be routed
37
        // back to the target state. This enables state to trigger one or many
38
        // state transitions without a new external event.
39
        InternalEvent []Event
40

41
        // ExternalEvent is an optional external event that is to be sent to
42
        // the daemon for dispatch. Usually, this is some form of I/O.
43
        ExternalEvents DaemonEventSet
44
}
45

46
// StateTransition is a state transition type. It denotes the next state to go
47
// to, and also the set of events to emit.
48
type StateTransition[Event any, Env Environment] struct {
49
        // NextState is the next state to transition to.
50
        NextState State[Event, Env]
51

52
        // NewEvents is the set of events to emit.
53
        NewEvents fn.Option[EmittedEvent[Event]]
54
}
55

56
// Environment is an abstract interface that represents the environment that
57
// the state machine will execute using. From the PoV of the main state machine
58
// executor, we just care about being able to clean up any resources that were
59
// allocated by the environment.
60
type Environment interface {
61
        // Name returns the name of the environment. This is used to uniquely
62
        // identify the environment of related state machines.
63
        Name() string
64
}
65

66
// State defines an abstract state along, namely its state transition function
67
// that takes as input an event and an environment, and returns a state
68
// transition (next state, and set of events to emit). As state can also either
69
// be terminal, or not, a terminal event causes state execution to halt.
70
type State[Event any, Env Environment] interface {
71
        // ProcessEvent takes an event and an environment, and returns a new
72
        // state transition. This will be iteratively called until either a
73
        // terminal state is reached, or no further internal events are
74
        // emitted.
75
        ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
76

77
        // IsTerminal returns true if this state is terminal, and false
78
        // otherwise.
79
        IsTerminal() bool
80

81
        // String returns a human readable string that represents the state.
82
        String() string
83
}
84

85
// DaemonAdapters is a set of methods that server as adapters to bridge the
86
// pure world of the FSM to the real world of the daemon. These will be used to
87
// do things like broadcast transactions, or send messages to peers.
88
type DaemonAdapters interface {
89
        // SendMessages sends the target set of messages to the target peer.
90
        SendMessages(btcec.PublicKey, []lnwire.Message) error
91

92
        // BroadcastTransaction broadcasts a transaction with the target label.
93
        BroadcastTransaction(*wire.MsgTx, string) error
94

95
        // RegisterConfirmationsNtfn registers an intent to be notified once
96
        // txid reaches numConfs confirmations. We also pass in the pkScript as
97
        // the default light client instead needs to match on scripts created
98
        // in the block. If a nil txid is passed in, then not only should we
99
        // match on the script, but we should also dispatch once the
100
        // transaction containing the script reaches numConfs confirmations.
101
        // This can be useful in instances where we only know the script in
102
        // advance, but not the transaction containing it.
103
        //
104
        // TODO(roasbeef): could abstract further?
105
        RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte,
106
                numConfs, heightHint uint32,
107
                opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error)
108

109
        // RegisterSpendNtfn registers an intent to be notified once the target
110
        // outpoint is successfully spent within a transaction. The script that
111
        // the outpoint creates must also be specified. This allows this
112
        // interface to be implemented by BIP 158-like filtering.
113
        RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte,
114
                heightHint uint32) (*chainntnfs.SpendEvent, error)
115
}
116

117
// stateQuery is used by outside callers to query the internal state of the
118
// state machine.
119
type stateQuery[Event any, Env Environment] struct {
120
        // CurrentState is a channel that will be sent the current state of the
121
        // state machine.
122
        CurrentState chan State[Event, Env]
123
}
124

125
// StateMachine represents an abstract FSM that is able to process new incoming
126
// events and drive a state machine to termination. This implementation uses
127
// type params to abstract over the types of events and environment. Events
128
// trigger new state transitions, that use the environment to perform some
129
// action.
130
//
131
// TODO(roasbeef): terminal check, daemon event execution, init?
132
type StateMachine[Event any, Env Environment] struct {
133
        cfg StateMachineCfg[Event, Env]
134

135
        log btclog.Logger
136

137
        // events is the channel that will be used to send new events to the
138
        // FSM.
139
        events chan Event
140

141
        // newStateEvents is an EventDistributor that will be used to notify
142
        // any relevant callers of new state transitions that occur.
143
        newStateEvents *fn.EventDistributor[State[Event, Env]]
144

145
        // stateQuery is a channel that will be used by outside callers to
146
        // query the internal state machine state.
147
        stateQuery chan stateQuery[Event, Env]
148

149
        gm   fn.GoroutineManager
150
        quit chan struct{}
151

152
        startOnce sync.Once
153
        stopOnce  sync.Once
154
}
155

156
// ErrorReporter is an interface that's used to report errors that occur during
157
// state machine execution.
158
type ErrorReporter interface {
159
        // ReportError is a method that's used to report an error that occurred
160
        // during state machine execution.
161
        ReportError(err error)
162
}
163

164
// StateMachineCfg is a configuration struct that's used to create a new state
165
// machine.
166
type StateMachineCfg[Event any, Env Environment] struct {
167
        // ErrorReporter is used to report errors that occur during state
168
        // transitions.
169
        ErrorReporter ErrorReporter
170

171
        // Daemon is a set of adapters that will be used to bridge the FSM to
172
        // the daemon.
173
        Daemon DaemonAdapters
174

175
        // InitialState is the initial state of the state machine.
176
        InitialState State[Event, Env]
177

178
        // Env is the environment that the state machine will use to execute.
179
        Env Env
180

181
        // InitEvent is an optional event that will be sent to the state
182
        // machine as if it was emitted at the onset of the state machine. This
183
        // can be used to set up tracking state such as a txid confirmation
184
        // event.
185
        InitEvent fn.Option[DaemonEvent]
186

187
        // MsgMapper is an optional message mapper that can be used to map
188
        // normal wire messages into FSM events.
189
        MsgMapper fn.Option[MsgMapper[Event]]
190

191
        // CustomPollInterval is an optional custom poll interval that can be
192
        // used to set a quicker interval for tests.
193
        CustomPollInterval fn.Option[time.Duration]
194
}
195

196
// NewStateMachine creates a new state machine given a set of daemon adapters,
197
// an initial state, an environment, and an event to process as if emitted at
198
// the onset of the state machine. Such an event can be used to set up tracking
199
// state such as a txid confirmation event.
200
func NewStateMachine[Event any, Env Environment](
201
        cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] {
3✔
202

3✔
203
        return StateMachine[Event, Env]{
3✔
204
                cfg: cfg,
3✔
205
                log: log.WithPrefix(
3✔
206
                        fmt.Sprintf("FSM(%v):", cfg.Env.Name()),
3✔
207
                ),
3✔
208
                events:         make(chan Event, 1),
3✔
209
                stateQuery:     make(chan stateQuery[Event, Env]),
3✔
210
                gm:             *fn.NewGoroutineManager(),
3✔
211
                newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
3✔
212
                quit:           make(chan struct{}),
3✔
213
        }
3✔
214
}
3✔
215

216
// Start starts the state machine. This will spawn a goroutine that will drive
217
// the state machine to completion.
218
func (s *StateMachine[Event, Env]) Start(ctx context.Context) {
3✔
219
        s.startOnce.Do(func() {
6✔
220
                _ = s.gm.Go(ctx, func(ctx context.Context) {
6✔
221
                        s.driveMachine(ctx)
3✔
222
                })
3✔
223
        })
224
}
225

226
// Stop stops the state machine. This will block until the state machine has
227
// reached a stopping point.
228
func (s *StateMachine[Event, Env]) Stop() {
×
229
        s.stopOnce.Do(func() {
×
230
                close(s.quit)
×
231
                s.gm.Stop()
×
232
        })
×
233
}
234

235
// SendEvent sends a new event to the state machine.
236
//
237
// TODO(roasbeef): bool if processed?
238
func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
3✔
239
        s.log.Debugf("Sending event %T", event)
3✔
240

3✔
241
        select {
3✔
242
        case s.events <- event:
3✔
243
        case <-ctx.Done():
×
244
                return
×
245
        case <-s.quit:
×
246
                return
×
247
        }
248
}
249

250
// CanHandle returns true if the target message can be routed to the state
251
// machine.
252
func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool {
3✔
253
        cfgMapper := s.cfg.MsgMapper
3✔
254
        return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool {
6✔
255
                return mapper.MapMsg(msg).IsSome()
3✔
256
        })
3✔
257
}
258

259
// Name returns the name of the state machine's environment.
260
func (s *StateMachine[Event, Env]) Name() string {
3✔
261
        return s.cfg.Env.Name()
3✔
262
}
3✔
263

264
// SendMessage attempts to send a wire message to the state machine. If the
265
// message can be mapped using the default message mapper, then true is
266
// returned indicating that the message was processed. Otherwise, false is
267
// returned.
268
func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context,
269
        msg msgmux.PeerMsg) bool {
3✔
270

3✔
271
        // If we have no message mapper, then return false as we can't process
3✔
272
        // this message.
3✔
273
        if !s.cfg.MsgMapper.IsSome() {
3✔
274
                return false
×
275
        }
×
276

277
        s.log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg))
3✔
278

3✔
279
        // Otherwise, try to map the message using the default message mapper.
3✔
280
        // If we can't extract an event, then we'll return false to indicate
3✔
281
        // that the message wasn't processed.
3✔
282
        var processed bool
3✔
283
        s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) {
6✔
284
                event := mapper.MapMsg(msg)
3✔
285

3✔
286
                event.WhenSome(func(event Event) {
6✔
287
                        s.SendEvent(ctx, event)
3✔
288

3✔
289
                        processed = true
3✔
290
                })
3✔
291
        })
292

293
        return processed
3✔
294
}
295

296
// CurrentState returns the current state of the state machine.
297
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
3✔
298
        query := stateQuery[Event, Env]{
3✔
299
                CurrentState: make(chan State[Event, Env], 1),
3✔
300
        }
3✔
301

3✔
302
        if !fn.SendOrQuit(s.stateQuery, query, s.quit) {
3✔
303
                return nil, ErrStateMachineShutdown
×
304
        }
×
305

306
        return fn.RecvOrTimeout(query.CurrentState, time.Second)
3✔
307
}
308

309
// StateSubscriber represents an active subscription to be notified of new
310
// state transitions.
311
type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
312

313
// RegisterStateEvents registers a new event listener that will be notified of
314
// new state transitions.
315
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[
316
        Event, Env] {
3✔
317

3✔
318
        subscriber := fn.NewEventReceiver[State[Event, Env]](10)
3✔
319

3✔
320
        // TODO(roasbeef): instead give the state and the input event?
3✔
321

3✔
322
        s.newStateEvents.RegisterSubscriber(subscriber)
3✔
323

3✔
324
        return subscriber
3✔
325
}
3✔
326

327
// RemoveStateSub removes the target state subscriber from the set of active
328
// subscribers.
329
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
330
        Event, Env]) {
3✔
331

3✔
332
        _ = s.newStateEvents.RemoveSubscriber(sub)
3✔
333
}
3✔
334

335
// executeDaemonEvent executes a daemon event, which is a special type of event
336
// that can be emitted as part of the state transition function of the state
337
// machine. An error is returned if the type of event is unknown.
338
func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
339
        event DaemonEvent) error {
3✔
340

3✔
341
        switch daemonEvent := event.(type) {
3✔
342
        // This is a send message event, so we'll send the event, and also mind
343
        // any preconditions as well as post-send events.
344
        case *SendMsgEvent[Event]:
3✔
345
                sendAndCleanUp := func() error {
6✔
346
                        s.log.DebugS(ctx, "Sending message:",
3✔
347
                                btclog.Hex6("target", daemonEvent.TargetPeer.SerializeCompressed()),
3✔
348
                                "messages", lnutils.SpewLogClosure(daemonEvent.Msgs))
3✔
349

3✔
350
                        err := s.cfg.Daemon.SendMessages(
3✔
351
                                daemonEvent.TargetPeer, daemonEvent.Msgs,
3✔
352
                        )
3✔
353
                        if err != nil {
3✔
354
                                return fmt.Errorf("unable to send msgs: %w",
×
355
                                        err)
×
356
                        }
×
357

358
                        // If a post-send event was specified, then we'll funnel
359
                        // that back into the main state machine now as well.
360
                        return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
6✔
361
                                launched := s.gm.Go(
3✔
362
                                        ctx, func(ctx context.Context) {
6✔
363
                                                s.log.DebugS(ctx, "Sending post-send event",
3✔
364
                                                        "event", lnutils.SpewLogClosure(event))
3✔
365

3✔
366
                                                s.SendEvent(ctx, event)
3✔
367
                                        },
3✔
368
                                )
369

370
                                if !launched {
3✔
371
                                        return ErrStateMachineShutdown
×
372
                                }
×
373

374
                                return nil
3✔
375
                        })
376
                }
377

378
                canSend := func() bool {
6✔
379
                        return fn.MapOptionZ(
3✔
380
                                daemonEvent.SendWhen,
3✔
381
                                func(pred SendPredicate) bool {
6✔
382
                                        return pred()
3✔
383
                                },
3✔
384
                        )
385
                }
386

387
                // If this doesn't have a SendWhen predicate, or if it's already
388
                // true, then we can just send it off right away.
389
                if !daemonEvent.SendWhen.IsSome() || canSend() {
6✔
390
                        return sendAndCleanUp()
3✔
391
                }
3✔
392

393
                // Otherwise, this has a SendWhen predicate, so we'll need
394
                // launch a goroutine to poll the SendWhen, then send only once
395
                // the predicate is true.
396
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
397
                        predicateTicker := time.NewTicker(
×
398
                                s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
×
399
                        )
×
400
                        defer predicateTicker.Stop()
×
401

×
402
                        s.log.InfoS(ctx, "Waiting for send predicate to be true")
×
403

×
404
                        for {
×
405
                                select {
×
406
                                case <-predicateTicker.C:
×
407
                                        if canSend() {
×
408
                                                s.log.InfoS(ctx, "Send active predicate")
×
409

×
410
                                                err := sendAndCleanUp()
×
411
                                                if err != nil {
×
412
                                                        s.log.ErrorS(ctx, "Unable to send message", err)
×
413
                                                }
×
414

415
                                                return
×
416
                                        }
417

418
                                case <-ctx.Done():
×
419
                                        return
×
420
                                }
421
                        }
422
                })
423

424
                if !launched {
×
425
                        return ErrStateMachineShutdown
×
426
                }
×
427

428
                return nil
×
429

430
        // If this is a broadcast transaction event, then we'll broadcast with
431
        // the label attached.
432
        case *BroadcastTxn:
3✔
433
                s.log.DebugS(ctx, "Broadcasting txn",
3✔
434
                        "txid", daemonEvent.Tx.TxHash())
3✔
435

3✔
436
                err := s.cfg.Daemon.BroadcastTransaction(
3✔
437
                        daemonEvent.Tx, daemonEvent.Label,
3✔
438
                )
3✔
439
                if err != nil {
6✔
440
                        log.Errorf("unable to broadcast txn: %v", err)
3✔
441
                }
3✔
442

443
                return nil
3✔
444

445
        // The state machine has requested a new event to be sent once a
446
        // transaction spending a specified outpoint has confirmed.
447
        case *RegisterSpend[Event]:
3✔
448
                s.log.DebugS(ctx, "Registering spend",
3✔
449
                        "outpoint", daemonEvent.OutPoint)
3✔
450

3✔
451
                spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
3✔
452
                        &daemonEvent.OutPoint, daemonEvent.PkScript,
3✔
453
                        daemonEvent.HeightHint,
3✔
454
                )
3✔
455
                if err != nil {
3✔
456
                        return fmt.Errorf("unable to register spend: %w", err)
×
457
                }
×
458

459
                launched := s.gm.Go(ctx, func(ctx context.Context) {
6✔
460
                        for {
6✔
461
                                select {
3✔
462
                                case spend, ok := <-spendEvent.Spend:
3✔
463
                                        if !ok {
6✔
464
                                                return
3✔
465
                                        }
3✔
466

467
                                        // If there's a post-send event, then
468
                                        // we'll send that into the current
469
                                        // state now.
470
                                        postSpend := daemonEvent.PostSpendEvent
3✔
471
                                        postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll
6✔
472
                                                customEvent := f(spend)
3✔
473
                                                s.SendEvent(ctx, customEvent)
3✔
474
                                        })
3✔
475

476
                                        return
3✔
477

478
                                case <-ctx.Done():
×
479
                                        return
×
480
                                }
481
                        }
482
                })
483

484
                if !launched {
3✔
485
                        return ErrStateMachineShutdown
×
486
                }
×
487

488
                return nil
3✔
489

490
        // The state machine has requested a new event to be sent once a
491
        // specified txid+pkScript pair has confirmed.
492
        case *RegisterConf[Event]:
×
493
                s.log.DebugS(ctx, "Registering conf",
×
494
                        "txid", daemonEvent.Txid)
×
495

×
NEW
496
                var opts []chainntnfs.NotifierOption
×
NEW
497
                if daemonEvent.FullBlock {
×
NEW
498
                        opts = append(opts, chainntnfs.WithIncludeBlock())
×
NEW
499
                }
×
500

501
                numConfs := daemonEvent.NumConfs.UnwrapOr(1)
×
502
                confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
×
503
                        &daemonEvent.Txid, daemonEvent.PkScript,
×
NEW
504
                        numConfs, daemonEvent.HeightHint, opts...,
×
505
                )
×
506
                if err != nil {
×
507
                        return fmt.Errorf("unable to register conf: %w", err)
×
508
                }
×
509

510
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
511
                        for {
×
512
                                select {
×
513
                                case conf, ok := <-confEvent.Confirmed:
×
514
                                        if !ok {
×
515
                                                return
×
516
                                        }
×
517

518
                                        // If there's a post-conf mapper, then
519
                                        // we'll send that into the current
520
                                        // state now.
521
                                        postConfMapper := daemonEvent.PostConfMapper        //nolint:ll
×
522
                                        postConfMapper.WhenSome(func(f ConfMapper[Event]) { //nolint:ll
×
523
                                                customEvent := f(conf)
×
524
                                                s.SendEvent(ctx, customEvent)
×
525
                                        })
×
526

527
                                        return
×
528

529
                                case <-ctx.Done():
×
530
                                        return
×
531
                                }
532
                        }
533
                })
534

535
                if !launched {
×
536
                        return ErrStateMachineShutdown
×
537
                }
×
538

539
                return nil
×
540
        }
541

542
        return fmt.Errorf("unknown daemon event: %T", event)
×
543
}
544

545
// applyEvents applies a new event to the state machine. This will continue
546
// until no further events are emitted by the state machine. Along the way,
547
// we'll also ensure to execute any daemon events that are emitted.
548
func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
549
        currentState State[Event, Env], newEvent Event) (State[Event, Env],
550
        error) {
3✔
551

3✔
552
        eventQueue := fn.NewQueue(newEvent)
3✔
553

3✔
554
        // Given the next event to handle, we'll process the event, then add
3✔
555
        // any new emitted internal events to our event queue. This continues
3✔
556
        // until we reach a terminal state, or we run out of internal events to
3✔
557
        // process.
3✔
558
        //
3✔
559
        //nolint:ll
3✔
560
        for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
6✔
561
                err := fn.MapOptionZ(nextEvent, func(event Event) error {
6✔
562
                        s.log.DebugS(ctx, "Processing event",
3✔
563
                                "event", lnutils.SpewLogClosure(event))
3✔
564

3✔
565
                        // Apply the state transition function of the current
3✔
566
                        // state given this new event and our existing env.
3✔
567
                        transition, err := currentState.ProcessEvent(
3✔
568
                                event, s.cfg.Env,
3✔
569
                        )
3✔
570
                        if err != nil {
3✔
571
                                return err
×
572
                        }
×
573

574
                        newEvents := transition.NewEvents
3✔
575
                        err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { //nolint:ll
6✔
576
                                // With the event processed, we'll process any
3✔
577
                                // new daemon events that were emitted as part
3✔
578
                                // of this new state transition.
3✔
579
                                for _, dEvent := range events.ExternalEvents {
6✔
580
                                        err := s.executeDaemonEvent(
3✔
581
                                                ctx, dEvent,
3✔
582
                                        )
3✔
583
                                        if err != nil {
3✔
584
                                                return err
×
585
                                        }
×
586
                                }
587

588
                                // Next, we'll add any new emitted events to our
589
                                // event queue.
590
                                //
591
                                //nolint:ll
592
                                for _, inEvent := range events.InternalEvent {
6✔
593
                                        s.log.DebugS(ctx, "Adding new internal event to queue",
3✔
594
                                                "event", lnutils.SpewLogClosure(inEvent))
3✔
595

3✔
596
                                        eventQueue.Enqueue(inEvent)
3✔
597
                                }
3✔
598

599
                                return nil
3✔
600
                        })
601
                        if err != nil {
3✔
602
                                return err
×
603
                        }
×
604

605
                        s.log.InfoS(ctx, "State transition",
3✔
606
                                btclog.Fmt("from_state", "%v", currentState),
3✔
607
                                btclog.Fmt("to_state", "%v", transition.NextState))
3✔
608

3✔
609
                        // With our events processed, we'll now update our
3✔
610
                        // internal state.
3✔
611
                        currentState = transition.NextState
3✔
612

3✔
613
                        // Notify our subscribers of the new state transition.
3✔
614
                        //
3✔
615
                        // TODO(roasbeef): will only give us the outer state?
3✔
616
                        //  * let FSMs choose which state to emit?
3✔
617
                        s.newStateEvents.NotifySubscribers(currentState)
3✔
618

3✔
619
                        return nil
3✔
620
                })
621
                if err != nil {
3✔
622
                        return currentState, err
×
623
                }
×
624
        }
625

626
        return currentState, nil
3✔
627
}
628

629
// driveMachine is the main event loop of the state machine. It accepts any new
630
// incoming events, and then drives the state machine forward until it reaches
631
// a terminal state.
632
func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
3✔
633
        s.log.DebugS(ctx, "Starting state machine")
3✔
634

3✔
635
        currentState := s.cfg.InitialState
3✔
636

3✔
637
        // Before we start, if we have an init daemon event specified, then
3✔
638
        // we'll handle that now.
3✔
639
        err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error {
6✔
640
                return s.executeDaemonEvent(ctx, event)
3✔
641
        })
3✔
642
        if err != nil {
3✔
643
                s.log.ErrorS(ctx, "Unable to execute init event", err)
×
644
                return
×
645
        }
×
646

647
        // We just started driving the state machine, so we'll notify our
648
        // subscribers of this starting state.
649
        s.newStateEvents.NotifySubscribers(currentState)
3✔
650

3✔
651
        for {
6✔
652
                select {
3✔
653
                // We have a new external event, so we'll drive the state
654
                // machine forward until we either run out of internal events,
655
                // or we reach a terminal state.
656
                case newEvent := <-s.events:
3✔
657
                        newState, err := s.applyEvents(
3✔
658
                                ctx, currentState, newEvent,
3✔
659
                        )
3✔
660
                        if err != nil {
3✔
661
                                s.cfg.ErrorReporter.ReportError(err)
×
662

×
663
                                s.log.ErrorS(ctx, "Unable to apply event", err)
×
664

×
665
                                // An error occurred, so we'll tear down the
×
666
                                // entire state machine as we can't proceed.
×
667
                                go s.Stop()
×
668

×
669
                                return
×
670
                        }
×
671

672
                        currentState = newState
3✔
673

674
                // An outside caller is querying our state, so we'll return the
675
                // latest state.
676
                case stateQuery := <-s.stateQuery:
3✔
677
                        if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { //nolint:ll
3✔
678
                                return
×
679
                        }
×
680

681
                case <-s.gm.Done():
×
682
                        return
×
683
                }
684
        }
685
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc