• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13974489001

20 Mar 2025 04:32PM UTC coverage: 56.292% (-2.9%) from 59.168%
13974489001

Pull #8754

github

web-flow
Merge aed149e6b into ea050d06f
Pull Request #8754: Add `Outbound` Remote Signer implementation

594 of 1713 new or added lines in 26 files covered. (34.68%)

23052 existing lines in 272 files now uncovered.

105921 of 188165 relevant lines covered (56.29%)

23796.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.13
/protofsm/state_machine.go
1
package protofsm
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/lightningnetwork/lnd/chainntnfs"
14
        "github.com/lightningnetwork/lnd/fn/v2"
15
        "github.com/lightningnetwork/lnd/lnutils"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
        "github.com/lightningnetwork/lnd/msgmux"
18
)
19

20
const (
21
        // pollInterval is the interval at which we'll poll the SendWhen
22
        // predicate if specified.
23
        pollInterval = time.Millisecond * 100
24
)
25

26
var (
27
        // ErrStateMachineShutdown occurs when trying to feed an event to a
28
        // StateMachine that has been asked to Stop.
29
        ErrStateMachineShutdown = fmt.Errorf("StateMachine is shutting down")
30
)
31

32
// EmittedEvent is a special type that can be emitted by a state transition.
33
// This can container internal events which are to be routed back to the state,
34
// or external events which are to be sent to the daemon.
35
type EmittedEvent[Event any] struct {
36
        // InternalEvent is an optional internal event that is to be routed
37
        // back to the target state. This enables state to trigger one or many
38
        // state transitions without a new external event.
39
        InternalEvent []Event
40

41
        // ExternalEvent is an optional external event that is to be sent to
42
        // the daemon for dispatch. Usually, this is some form of I/O.
43
        ExternalEvents DaemonEventSet
44
}
45

46
// StateTransition is a state transition type. It denotes the next state to go
47
// to, and also the set of events to emit.
48
type StateTransition[Event any, Env Environment] struct {
49
        // NextState is the next state to transition to.
50
        NextState State[Event, Env]
51

52
        // NewEvents is the set of events to emit.
53
        NewEvents fn.Option[EmittedEvent[Event]]
54
}
55

56
// Environment is an abstract interface that represents the environment that
57
// the state machine will execute using. From the PoV of the main state machine
58
// executor, we just care about being able to clean up any resources that were
59
// allocated by the environment.
60
type Environment interface {
61
        // Name returns the name of the environment. This is used to uniquely
62
        // identify the environment of related state machines.
63
        Name() string
64
}
65

66
// State defines an abstract state along, namely its state transition function
67
// that takes as input an event and an environment, and returns a state
68
// transition (next state, and set of events to emit). As state can also either
69
// be terminal, or not, a terminal event causes state execution to halt.
70
type State[Event any, Env Environment] interface {
71
        // ProcessEvent takes an event and an environment, and returns a new
72
        // state transition. This will be iteratively called until either a
73
        // terminal state is reached, or no further internal events are
74
        // emitted.
75
        ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
76

77
        // IsTerminal returns true if this state is terminal, and false
78
        // otherwise.
79
        IsTerminal() bool
80

81
        // String returns a human readable string that represents the state.
82
        String() string
83
}
84

85
// DaemonAdapters is a set of methods that server as adapters to bridge the
86
// pure world of the FSM to the real world of the daemon. These will be used to
87
// do things like broadcast transactions, or send messages to peers.
88
type DaemonAdapters interface {
89
        // SendMessages sends the target set of messages to the target peer.
90
        SendMessages(btcec.PublicKey, []lnwire.Message) error
91

92
        // BroadcastTransaction broadcasts a transaction with the target label.
93
        BroadcastTransaction(*wire.MsgTx, string) error
94

95
        // RegisterConfirmationsNtfn registers an intent to be notified once
96
        // txid reaches numConfs confirmations. We also pass in the pkScript as
97
        // the default light client instead needs to match on scripts created
98
        // in the block. If a nil txid is passed in, then not only should we
99
        // match on the script, but we should also dispatch once the
100
        // transaction containing the script reaches numConfs confirmations.
101
        // This can be useful in instances where we only know the script in
102
        // advance, but not the transaction containing it.
103
        //
104
        // TODO(roasbeef): could abstract further?
105
        RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte,
106
                numConfs, heightHint uint32,
107
                opts ...chainntnfs.NotifierOption,
108
        ) (*chainntnfs.ConfirmationEvent, error)
109

110
        // RegisterSpendNtfn registers an intent to be notified once the target
111
        // outpoint is successfully spent within a transaction. The script that
112
        // the outpoint creates must also be specified. This allows this
113
        // interface to be implemented by BIP 158-like filtering.
114
        RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte,
115
                heightHint uint32) (*chainntnfs.SpendEvent, error)
116
}
117

118
// stateQuery is used by outside callers to query the internal state of the
119
// state machine.
120
type stateQuery[Event any, Env Environment] struct {
121
        // CurrentState is a channel that will be sent the current state of the
122
        // state machine.
123
        CurrentState chan State[Event, Env]
124
}
125

126
// StateMachine represents an abstract FSM that is able to process new incoming
127
// events and drive a state machine to termination. This implementation uses
128
// type params to abstract over the types of events and environment. Events
129
// trigger new state transitions, that use the environment to perform some
130
// action.
131
//
132
// TODO(roasbeef): terminal check, daemon event execution, init?
133
type StateMachine[Event any, Env Environment] struct {
134
        cfg StateMachineCfg[Event, Env]
135

136
        log btclog.Logger
137

138
        // events is the channel that will be used to send new events to the
139
        // FSM.
140
        events chan Event
141

142
        // newStateEvents is an EventDistributor that will be used to notify
143
        // any relevant callers of new state transitions that occur.
144
        newStateEvents *fn.EventDistributor[State[Event, Env]]
145

146
        // stateQuery is a channel that will be used by outside callers to
147
        // query the internal state machine state.
148
        stateQuery chan stateQuery[Event, Env]
149

150
        gm   fn.GoroutineManager
151
        quit chan struct{}
152

153
        startOnce sync.Once
154
        stopOnce  sync.Once
155
}
156

157
// ErrorReporter is an interface that's used to report errors that occur during
158
// state machine execution.
159
type ErrorReporter interface {
160
        // ReportError is a method that's used to report an error that occurred
161
        // during state machine execution.
162
        ReportError(err error)
163
}
164

165
// StateMachineCfg is a configuration struct that's used to create a new state
166
// machine.
167
type StateMachineCfg[Event any, Env Environment] struct {
168
        // ErrorReporter is used to report errors that occur during state
169
        // transitions.
170
        ErrorReporter ErrorReporter
171

172
        // Daemon is a set of adapters that will be used to bridge the FSM to
173
        // the daemon.
174
        Daemon DaemonAdapters
175

176
        // InitialState is the initial state of the state machine.
177
        InitialState State[Event, Env]
178

179
        // Env is the environment that the state machine will use to execute.
180
        Env Env
181

182
        // InitEvent is an optional event that will be sent to the state
183
        // machine as if it was emitted at the onset of the state machine. This
184
        // can be used to set up tracking state such as a txid confirmation
185
        // event.
186
        InitEvent fn.Option[DaemonEvent]
187

188
        // MsgMapper is an optional message mapper that can be used to map
189
        // normal wire messages into FSM events.
190
        MsgMapper fn.Option[MsgMapper[Event]]
191

192
        // CustomPollInterval is an optional custom poll interval that can be
193
        // used to set a quicker interval for tests.
194
        CustomPollInterval fn.Option[time.Duration]
195
}
196

197
// NewStateMachine creates a new state machine given a set of daemon adapters,
198
// an initial state, an environment, and an event to process as if emitted at
199
// the onset of the state machine. Such an event can be used to set up tracking
200
// state such as a txid confirmation event.
201
func NewStateMachine[Event any, Env Environment](
202
        cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] {
46✔
203

46✔
204
        return StateMachine[Event, Env]{
46✔
205
                cfg: cfg,
46✔
206
                log: log.WithPrefix(
46✔
207
                        fmt.Sprintf("FSM(%v):", cfg.Env.Name()),
46✔
208
                ),
46✔
209
                events:         make(chan Event, 1),
46✔
210
                stateQuery:     make(chan stateQuery[Event, Env]),
46✔
211
                gm:             *fn.NewGoroutineManager(),
46✔
212
                newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
46✔
213
                quit:           make(chan struct{}),
46✔
214
        }
46✔
215
}
46✔
216

217
// Start starts the state machine. This will spawn a goroutine that will drive
218
// the state machine to completion.
219
func (s *StateMachine[Event, Env]) Start(ctx context.Context) {
46✔
220
        s.startOnce.Do(func() {
92✔
221
                _ = s.gm.Go(ctx, func(ctx context.Context) {
92✔
222
                        s.driveMachine(ctx)
46✔
223
                })
46✔
224
        })
225
}
226

227
// Stop stops the state machine. This will block until the state machine has
228
// reached a stopping point.
229
func (s *StateMachine[Event, Env]) Stop() {
60✔
230
        s.stopOnce.Do(func() {
106✔
231
                close(s.quit)
46✔
232
                s.gm.Stop()
46✔
233
        })
46✔
234
}
235

236
// SendEvent sends a new event to the state machine.
237
//
238
// TODO(roasbeef): bool if processed?
239
func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
58✔
240
        s.log.DebugS(ctx, "Sending event",
58✔
241
                "event", lnutils.SpewLogClosure(event))
58✔
242

58✔
243
        select {
58✔
244
        case s.events <- event:
58✔
245
        case <-ctx.Done():
×
246
                return
×
247
        case <-s.quit:
×
248
                return
×
249
        }
250
}
251

252
// CanHandle returns true if the target message can be routed to the state
253
// machine.
254
func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool {
2✔
255
        cfgMapper := s.cfg.MsgMapper
2✔
256
        return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool {
4✔
257
                return mapper.MapMsg(msg).IsSome()
2✔
258
        })
2✔
259
}
260

261
// Name returns the name of the state machine's environment.
UNCOV
262
func (s *StateMachine[Event, Env]) Name() string {
×
UNCOV
263
        return s.cfg.Env.Name()
×
UNCOV
264
}
×
265

266
// SendMessage attempts to send a wire message to the state machine. If the
267
// message can be mapped using the default message mapper, then true is
268
// returned indicating that the message was processed. Otherwise, false is
269
// returned.
270
func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context,
271
        msg msgmux.PeerMsg) bool {
1✔
272

1✔
273
        // If we have no message mapper, then return false as we can't process
1✔
274
        // this message.
1✔
275
        if !s.cfg.MsgMapper.IsSome() {
1✔
276
                return false
×
277
        }
×
278

279
        s.log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg))
1✔
280

1✔
281
        // Otherwise, try to map the message using the default message mapper.
1✔
282
        // If we can't extract an event, then we'll return false to indicate
1✔
283
        // that the message wasn't processed.
1✔
284
        var processed bool
1✔
285
        s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) {
2✔
286
                event := mapper.MapMsg(msg)
1✔
287

1✔
288
                event.WhenSome(func(event Event) {
2✔
289
                        s.SendEvent(ctx, event)
1✔
290

1✔
291
                        processed = true
1✔
292
                })
1✔
293
        })
294

295
        return processed
1✔
296
}
297

298
// CurrentState returns the current state of the state machine.
299
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
25✔
300
        query := stateQuery[Event, Env]{
25✔
301
                CurrentState: make(chan State[Event, Env], 1),
25✔
302
        }
25✔
303

25✔
304
        if !fn.SendOrQuit(s.stateQuery, query, s.quit) {
25✔
305
                return nil, ErrStateMachineShutdown
×
306
        }
×
307

308
        return fn.RecvOrTimeout(query.CurrentState, time.Second)
25✔
309
}
310

311
// StateSubscriber represents an active subscription to be notified of new
312
// state transitions.
313
type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
314

315
// RegisterStateEvents registers a new event listener that will be notified of
316
// new state transitions.
317
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[
318
        Event, Env] {
46✔
319

46✔
320
        subscriber := fn.NewEventReceiver[State[Event, Env]](10)
46✔
321

46✔
322
        // TODO(roasbeef): instead give the state and the input event?
46✔
323

46✔
324
        s.newStateEvents.RegisterSubscriber(subscriber)
46✔
325

46✔
326
        return subscriber
46✔
327
}
46✔
328

329
// RemoveStateSub removes the target state subscriber from the set of active
330
// subscribers.
331
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
332
        Event, Env]) {
46✔
333

46✔
334
        _ = s.newStateEvents.RemoveSubscriber(sub)
46✔
335
}
46✔
336

337
// executeDaemonEvent executes a daemon event, which is a special type of event
338
// that can be emitted as part of the state transition function of the state
339
// machine. An error is returned if the type of event is unknown.
340
func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
341
        event DaemonEvent) error {
73✔
342

73✔
343
        switch daemonEvent := event.(type) {
73✔
344
        // This is a send message event, so we'll send the event, and also mind
345
        // any preconditions as well as post-send events.
346
        case *SendMsgEvent[Event]:
20✔
347
                sendAndCleanUp := func() error {
39✔
348
                        s.log.DebugS(ctx, "Sending message:",
19✔
349
                                btclog.Hex6("target", daemonEvent.TargetPeer.SerializeCompressed()),
19✔
350
                                "messages", lnutils.SpewLogClosure(daemonEvent.Msgs))
19✔
351

19✔
352
                        err := s.cfg.Daemon.SendMessages(
19✔
353
                                daemonEvent.TargetPeer, daemonEvent.Msgs,
19✔
354
                        )
19✔
355
                        if err != nil {
19✔
356
                                return fmt.Errorf("unable to send msgs: %w",
×
357
                                        err)
×
358
                        }
×
359

360
                        // If a post-send event was specified, then we'll funnel
361
                        // that back into the main state machine now as well.
362
                        return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
21✔
363
                                launched := s.gm.Go(
2✔
364
                                        ctx, func(ctx context.Context) {
4✔
365
                                                s.log.DebugS(ctx, "Sending post-send event",
2✔
366
                                                        "event", lnutils.SpewLogClosure(event))
2✔
367

2✔
368
                                                s.SendEvent(ctx, event)
2✔
369
                                        },
2✔
370
                                )
371

372
                                if !launched {
2✔
373
                                        return ErrStateMachineShutdown
×
374
                                }
×
375

376
                                return nil
2✔
377
                        })
378
                }
379

380
                canSend := func() bool {
25✔
381
                        return fn.MapOptionZ(
5✔
382
                                daemonEvent.SendWhen,
5✔
383
                                func(pred SendPredicate) bool {
10✔
384
                                        return pred()
5✔
385
                                },
5✔
386
                        )
387
                }
388

389
                // If this doesn't have a SendWhen predicate, or if it's already
390
                // true, then we can just send it off right away.
391
                if !daemonEvent.SendWhen.IsSome() || canSend() {
38✔
392
                        return sendAndCleanUp()
18✔
393
                }
18✔
394

395
                // Otherwise, this has a SendWhen predicate, so we'll need
396
                // launch a goroutine to poll the SendWhen, then send only once
397
                // the predicate is true.
398
                launched := s.gm.Go(ctx, func(ctx context.Context) {
4✔
399
                        predicateTicker := time.NewTicker(
2✔
400
                                s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
2✔
401
                        )
2✔
402
                        defer predicateTicker.Stop()
2✔
403

2✔
404
                        s.log.InfoS(ctx, "Waiting for send predicate to be true")
2✔
405

2✔
406
                        for {
5✔
407
                                select {
3✔
408
                                case <-predicateTicker.C:
2✔
409
                                        if canSend() {
3✔
410
                                                s.log.InfoS(ctx, "Send active predicate")
1✔
411

1✔
412
                                                err := sendAndCleanUp()
1✔
413
                                                if err != nil {
1✔
414
                                                        s.log.ErrorS(ctx, "Unable to send message", err)
×
415
                                                }
×
416

417
                                                return
1✔
418
                                        }
419

420
                                case <-ctx.Done():
1✔
421
                                        return
1✔
422
                                }
423
                        }
424
                })
425

426
                if !launched {
2✔
427
                        return ErrStateMachineShutdown
×
428
                }
×
429

430
                return nil
2✔
431

432
        // If this is a broadcast transaction event, then we'll broadcast with
433
        // the label attached.
434
        case *BroadcastTxn:
11✔
435
                s.log.DebugS(ctx, "Broadcasting txn",
11✔
436
                        "txid", daemonEvent.Tx.TxHash())
11✔
437

11✔
438
                err := s.cfg.Daemon.BroadcastTransaction(
11✔
439
                        daemonEvent.Tx, daemonEvent.Label,
11✔
440
                )
11✔
441
                if err != nil {
11✔
UNCOV
442
                        log.Errorf("unable to broadcast txn: %v", err)
×
UNCOV
443
                }
×
444

445
                return nil
11✔
446

447
        // The state machine has requested a new event to be sent once a
448
        // transaction spending a specified outpoint has confirmed.
449
        case *RegisterSpend[Event]:
42✔
450
                s.log.DebugS(ctx, "Registering spend",
42✔
451
                        "outpoint", daemonEvent.OutPoint)
42✔
452

42✔
453
                spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
42✔
454
                        &daemonEvent.OutPoint, daemonEvent.PkScript,
42✔
455
                        daemonEvent.HeightHint,
42✔
456
                )
42✔
457
                if err != nil {
42✔
458
                        return fmt.Errorf("unable to register spend: %w", err)
×
459
                }
×
460

461
                launched := s.gm.Go(ctx, func(ctx context.Context) {
84✔
462
                        for {
84✔
463
                                select {
42✔
UNCOV
464
                                case spend, ok := <-spendEvent.Spend:
×
UNCOV
465
                                        if !ok {
×
UNCOV
466
                                                return
×
UNCOV
467
                                        }
×
468

469
                                        // If there's a post-send event, then
470
                                        // we'll send that into the current
471
                                        // state now.
UNCOV
472
                                        postSpend := daemonEvent.PostSpendEvent
×
UNCOV
473
                                        postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll
×
UNCOV
474
                                                customEvent := f(spend)
×
UNCOV
475
                                                s.SendEvent(ctx, customEvent)
×
UNCOV
476
                                        })
×
477

UNCOV
478
                                        return
×
479

480
                                case <-ctx.Done():
42✔
481
                                        return
42✔
482
                                }
483
                        }
484
                })
485

486
                if !launched {
42✔
487
                        return ErrStateMachineShutdown
×
488
                }
×
489

490
                return nil
42✔
491

492
        // The state machine has requested a new event to be sent once a
493
        // specified txid+pkScript pair has confirmed.
494
        case *RegisterConf[Event]:
×
495
                s.log.DebugS(ctx, "Registering conf",
×
496
                        "txid", daemonEvent.Txid)
×
497

×
498
                numConfs := daemonEvent.NumConfs.UnwrapOr(1)
×
499
                confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
×
500
                        &daemonEvent.Txid, daemonEvent.PkScript,
×
501
                        numConfs, daemonEvent.HeightHint,
×
502
                )
×
503
                if err != nil {
×
504
                        return fmt.Errorf("unable to register conf: %w", err)
×
505
                }
×
506

507
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
508
                        for {
×
509
                                select {
×
510
                                case <-confEvent.Confirmed:
×
511
                                        // If there's a post-conf event, then
×
512
                                        // we'll send that into the current
×
513
                                        // state now.
×
514
                                        //
×
515
                                        // TODO(roasbeef): refactor to
×
516
                                        // dispatchAfterRecv w/ above
×
517
                                        postConf := daemonEvent.PostConfEvent
×
518
                                        postConf.WhenSome(func(e Event) {
×
519
                                                s.SendEvent(ctx, e)
×
520
                                        })
×
521

522
                                        return
×
523

524
                                case <-ctx.Done():
×
525
                                        return
×
526
                                }
527
                        }
528
                })
529

530
                if !launched {
×
531
                        return ErrStateMachineShutdown
×
532
                }
×
533

534
                return nil
×
535
        }
536

537
        return fmt.Errorf("unknown daemon event: %T", event)
×
538
}
539

540
// applyEvents applies a new event to the state machine. This will continue
541
// until no further events are emitted by the state machine. Along the way,
542
// we'll also ensure to execute any daemon events that are emitted.
543
func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
544
        currentState State[Event, Env], newEvent Event) (State[Event, Env],
545
        error) {
58✔
546

58✔
547
        s.log.DebugS(ctx, "Applying new event",
58✔
548
                "event", lnutils.SpewLogClosure(newEvent))
58✔
549

58✔
550
        eventQueue := fn.NewQueue(newEvent)
58✔
551

58✔
552
        // Given the next event to handle, we'll process the event, then add
58✔
553
        // any new emitted internal events to our event queue. This continues
58✔
554
        // until we reach a terminal state, or we run out of internal events to
58✔
555
        // process.
58✔
556
        //
58✔
557
        //nolint:ll
58✔
558
        for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
127✔
559
                err := fn.MapOptionZ(nextEvent, func(event Event) error {
138✔
560
                        s.log.DebugS(ctx, "Processing event",
69✔
561
                                "event", lnutils.SpewLogClosure(event))
69✔
562

69✔
563
                        // Apply the state transition function of the current
69✔
564
                        // state given this new event and our existing env.
69✔
565
                        transition, err := currentState.ProcessEvent(
69✔
566
                                event, s.cfg.Env,
69✔
567
                        )
69✔
568
                        if err != nil {
83✔
569
                                return err
14✔
570
                        }
14✔
571

572
                        newEvents := transition.NewEvents
55✔
573
                        err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { //nolint:ll
88✔
574
                                // With the event processed, we'll process any
33✔
575
                                // new daemon events that were emitted as part
33✔
576
                                // of this new state transition.
33✔
577
                                for _, dEvent := range events.ExternalEvents {
63✔
578
                                        err := s.executeDaemonEvent(
30✔
579
                                                ctx, dEvent,
30✔
580
                                        )
30✔
581
                                        if err != nil {
30✔
582
                                                return err
×
583
                                        }
×
584
                                }
585

586
                                // Next, we'll add any new emitted events to our
587
                                // event queue.
588
                                //
589
                                //nolint:ll
590
                                for _, inEvent := range events.InternalEvent {
44✔
591
                                        s.log.DebugS(ctx, "Adding new internal event to queue",
11✔
592
                                                "event", lnutils.SpewLogClosure(inEvent))
11✔
593

11✔
594
                                        eventQueue.Enqueue(inEvent)
11✔
595
                                }
11✔
596

597
                                return nil
33✔
598
                        })
599
                        if err != nil {
55✔
600
                                return err
×
601
                        }
×
602

603
                        s.log.InfoS(ctx, "State transition",
55✔
604
                                btclog.Fmt("from_state", "%v", currentState),
55✔
605
                                btclog.Fmt("to_state", "%v", transition.NextState))
55✔
606

55✔
607
                        // With our events processed, we'll now update our
55✔
608
                        // internal state.
55✔
609
                        currentState = transition.NextState
55✔
610

55✔
611
                        // Notify our subscribers of the new state transition.
55✔
612
                        //
55✔
613
                        // TODO(roasbeef): will only give us the outer state?
55✔
614
                        //  * let FSMs choose which state to emit?
55✔
615
                        s.newStateEvents.NotifySubscribers(currentState)
55✔
616

55✔
617
                        return nil
55✔
618
                })
619
                if err != nil {
83✔
620
                        return currentState, err
14✔
621
                }
14✔
622
        }
623

624
        return currentState, nil
44✔
625
}
626

627
// driveMachine is the main event loop of the state machine. It accepts any new
628
// incoming events, and then drives the state machine forward until it reaches
629
// a terminal state.
630
func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
46✔
631
        s.log.DebugS(ctx, "Starting state machine")
46✔
632

46✔
633
        currentState := s.cfg.InitialState
46✔
634

46✔
635
        // Before we start, if we have an init daemon event specified, then
46✔
636
        // we'll handle that now.
46✔
637
        err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error {
89✔
638
                return s.executeDaemonEvent(ctx, event)
43✔
639
        })
43✔
640
        if err != nil {
46✔
641
                s.log.ErrorS(ctx, "Unable to execute init event", err)
×
642
                return
×
643
        }
×
644

645
        // We just started driving the state machine, so we'll notify our
646
        // subscribers of this starting state.
647
        s.newStateEvents.NotifySubscribers(currentState)
46✔
648

46✔
649
        for {
161✔
650
                select {
115✔
651
                // We have a new external event, so we'll drive the state
652
                // machine forward until we either run out of internal events,
653
                // or we reach a terminal state.
654
                case newEvent := <-s.events:
58✔
655
                        newState, err := s.applyEvents(
58✔
656
                                ctx, currentState, newEvent,
58✔
657
                        )
58✔
658
                        if err != nil {
72✔
659
                                s.cfg.ErrorReporter.ReportError(err)
14✔
660

14✔
661
                                s.log.ErrorS(ctx, "Unable to apply event", err)
14✔
662

14✔
663
                                // An error occurred, so we'll tear down the
14✔
664
                                // entire state machine as we can't proceed.
14✔
665
                                go s.Stop()
14✔
666

14✔
667
                                return
14✔
668
                        }
14✔
669

670
                        currentState = newState
44✔
671

672
                // An outside caller is querying our state, so we'll return the
673
                // latest state.
674
                case stateQuery := <-s.stateQuery:
25✔
675
                        if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { //nolint:ll
25✔
676
                                return
×
677
                        }
×
678

679
                case <-s.gm.Done():
32✔
680
                        return
32✔
681
                }
682
        }
683
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc