• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 19087480965

05 Nov 2025 12:44AM UTC coverage: 56.868%. First build
19087480965

Pull #10346

github

web-flow
Merge 1387ac1e1 into dc5d57f28
Pull Request #10346: protofsm: extend EmittedEvents with new Outbox field

10 of 74 new or added lines in 1 file covered. (13.51%)

99641 of 175213 relevant lines covered (56.87%)

1.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.13
/protofsm/state_machine.go
1
package protofsm
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btclog/v2"
14
        "github.com/lightningnetwork/lnd/actor"
15
        "github.com/lightningnetwork/lnd/chainntnfs"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/lnutils"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/msgmux"
20
)
21

22
const (
23
        // pollInterval is the interval at which we'll poll the SendWhen
24
        // predicate if specified.
25
        pollInterval = time.Millisecond * 100
26
)
27

28
var (
29
        // ErrStateMachineShutdown occurs when trying to feed an event to a
30
        // StateMachine that has been asked to Stop.
31
        ErrStateMachineShutdown = fmt.Errorf("StateMachine is shutting down")
32
)
33

34
// EmittedEvent is a special type that can be emitted by a state transition.
35
// This can container internal events which are to be routed back to the state,
36
// or external events which are to be sent to the daemon.
37
type EmittedEvent[Event any] struct {
38
        // InternalEvent is an optional internal event that is to be routed
39
        // back to the target state. This enables state to trigger one or many
40
        // state transitions without a new external event.
41
        InternalEvent []Event
42

43
        // ExternalEvent is an optional external event that is to be sent to
44
        // the daemon for dispatch. Usually, this is some form of I/O.
45
        ExternalEvents DaemonEventSet
46

47
        // Outbox is an optional set of events that are accumulated during event
48
        // processing and returned to the caller for processing into the main
49
        // state machine. This enables nested state machines to emit events that
50
        // bubble up to their parent.
51
        Outbox []Event
52
}
53

54
// StateTransition is a state transition type. It denotes the next state to go
55
// to, and also the set of events to emit.
56
type StateTransition[Event any, Env Environment] struct {
57
        // NextState is the next state to transition to.
58
        NextState State[Event, Env]
59

60
        // NewEvents is the set of events to emit.
61
        NewEvents fn.Option[EmittedEvent[Event]]
62
}
63

64
// Environment is an abstract interface that represents the environment that
65
// the state machine will execute using. From the PoV of the main state machine
66
// executor, we just care about being able to clean up any resources that were
67
// allocated by the environment.
68
type Environment interface {
69
        // Name returns the name of the environment. This is used to uniquely
70
        // identify the environment of related state machines.
71
        Name() string
72
}
73

74
// State defines an abstract state along, namely its state transition function
75
// that takes as input an event and an environment, and returns a state
76
// transition (next state, and set of events to emit). As state can also either
77
// be terminal, or not, a terminal event causes state execution to halt.
78
type State[Event any, Env Environment] interface {
79
        // ProcessEvent takes an event and an environment, and returns a new
80
        // state transition. This will be iteratively called until either a
81
        // terminal state is reached, or no further internal events are
82
        // emitted.
83
        ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
84

85
        // IsTerminal returns true if this state is terminal, and false
86
        // otherwise.
87
        IsTerminal() bool
88

89
        // String returns a human readable string that represents the state.
90
        String() string
91
}
92

93
// DaemonAdapters is a set of methods that server as adapters to bridge the
94
// pure world of the FSM to the real world of the daemon. These will be used to
95
// do things like broadcast transactions, or send messages to peers.
96
type DaemonAdapters interface {
97
        // SendMessages sends the target set of messages to the target peer.
98
        SendMessages(btcec.PublicKey, []lnwire.Message) error
99

100
        // BroadcastTransaction broadcasts a transaction with the target label.
101
        BroadcastTransaction(*wire.MsgTx, string) error
102

103
        // RegisterConfirmationsNtfn registers an intent to be notified once
104
        // txid reaches numConfs confirmations. We also pass in the pkScript as
105
        // the default light client instead needs to match on scripts created
106
        // in the block. If a nil txid is passed in, then not only should we
107
        // match on the script, but we should also dispatch once the
108
        // transaction containing the script reaches numConfs confirmations.
109
        // This can be useful in instances where we only know the script in
110
        // advance, but not the transaction containing it.
111
        //
112
        // TODO(roasbeef): could abstract further?
113
        RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte,
114
                numConfs, heightHint uint32,
115
                opts ...chainntnfs.NotifierOption) (
116
                *chainntnfs.ConfirmationEvent, error)
117

118
        // RegisterSpendNtfn registers an intent to be notified once the target
119
        // outpoint is successfully spent within a transaction. The script that
120
        // the outpoint creates must also be specified. This allows this
121
        // interface to be implemented by BIP 158-like filtering.
122
        RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte,
123
                heightHint uint32) (*chainntnfs.SpendEvent, error)
124
}
125

126
// stateQuery is used by outside callers to query the internal state of the
127
// state machine.
128
type stateQuery[Event any, Env Environment] struct {
129
        // CurrentState is a channel that will be sent the current state of the
130
        // state machine.
131
        CurrentState chan State[Event, Env]
132
}
133

134
// syncEventRequest is used to send an event to the state machine synchronously,
135
// waiting for the event processing to complete and returning the accumulated
136
// outbox events.
137
type syncEventRequest[Event any] struct {
138
        // event is the event to process.
139
        event Event
140

141
        // promise is used to signal completion and return the accumulated
142
        // outbox events or an error.
143
        promise actor.Promise[[]Event]
144
}
145

146
// StateMachine represents an abstract FSM that is able to process new incoming
147
// events and drive a state machine to termination. This implementation uses
148
// type params to abstract over the types of events and environment. Events
149
// trigger new state transitions, that use the environment to perform some
150
// action.
151
//
152
// TODO(roasbeef): terminal check, daemon event execution, init?
153
type StateMachine[Event any, Env Environment] struct {
154
        cfg StateMachineCfg[Event, Env]
155

156
        log btclog.Logger
157

158
        // events is the channel that will be used to send new events to the
159
        // FSM.
160
        events chan Event
161

162
        // syncEvents is the channel that will be used to send synchronous event
163
        // requests to the FSM, returning the accumulated outbox events.
164
        syncEvents chan syncEventRequest[Event]
165

166
        // newStateEvents is an EventDistributor that will be used to notify
167
        // any relevant callers of new state transitions that occur.
168
        newStateEvents *fn.EventDistributor[State[Event, Env]]
169

170
        // stateQuery is a channel that will be used by outside callers to
171
        // query the internal state machine state.
172
        stateQuery chan stateQuery[Event, Env]
173

174
        gm   fn.GoroutineManager
175
        quit chan struct{}
176

177
        // startOnce and stopOnce are used to ensure that the state machine is
178
        // only started and stopped once.
179
        startOnce sync.Once
180
        stopOnce  sync.Once
181

182
        // running is a flag that indicates if the state machine is currently
183
        // running.
184
        running atomic.Bool
185
}
186

187
// ErrorReporter is an interface that's used to report errors that occur during
188
// state machine execution.
189
type ErrorReporter interface {
190
        // ReportError is a method that's used to report an error that occurred
191
        // during state machine execution.
192
        ReportError(err error)
193
}
194

195
// StateMachineCfg is a configuration struct that's used to create a new state
196
// machine.
197
type StateMachineCfg[Event any, Env Environment] struct {
198
        // ErrorReporter is used to report errors that occur during state
199
        // transitions.
200
        ErrorReporter ErrorReporter
201

202
        // Daemon is a set of adapters that will be used to bridge the FSM to
203
        // the daemon.
204
        Daemon DaemonAdapters
205

206
        // InitialState is the initial state of the state machine.
207
        InitialState State[Event, Env]
208

209
        // Env is the environment that the state machine will use to execute.
210
        Env Env
211

212
        // InitEvent is an optional event that will be sent to the state
213
        // machine as if it was emitted at the onset of the state machine. This
214
        // can be used to set up tracking state such as a txid confirmation
215
        // event.
216
        InitEvent fn.Option[DaemonEvent]
217

218
        // MsgMapper is an optional message mapper that can be used to map
219
        // normal wire messages into FSM events.
220
        MsgMapper fn.Option[MsgMapper[Event]]
221

222
        // CustomPollInterval is an optional custom poll interval that can be
223
        // used to set a quicker interval for tests.
224
        CustomPollInterval fn.Option[time.Duration]
225
}
226

227
// NewStateMachine creates a new state machine given a set of daemon adapters,
228
// an initial state, an environment, and an event to process as if emitted at
229
// the onset of the state machine. Such an event can be used to set up tracking
230
// state such as a txid confirmation event.
231
func NewStateMachine[Event any, Env Environment](
232
        cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] {
3✔
233

3✔
234
        return StateMachine[Event, Env]{
3✔
235
                cfg: cfg,
3✔
236
                log: log.WithPrefix(
3✔
237
                        fmt.Sprintf("FSM(%v):", cfg.Env.Name()),
3✔
238
                ),
3✔
239
                events:         make(chan Event, 1),
3✔
240
                syncEvents:     make(chan syncEventRequest[Event], 1),
3✔
241
                stateQuery:     make(chan stateQuery[Event, Env]),
3✔
242
                gm:             *fn.NewGoroutineManager(),
3✔
243
                newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
3✔
244
                quit:           make(chan struct{}),
3✔
245
        }
3✔
246
}
3✔
247

248
// Start starts the state machine. This will spawn a goroutine that will drive
249
// the state machine to completion.
250
func (s *StateMachine[Event, Env]) Start(ctx context.Context) {
3✔
251
        s.startOnce.Do(func() {
6✔
252
                _ = s.gm.Go(ctx, func(ctx context.Context) {
6✔
253
                        s.driveMachine(ctx)
3✔
254
                })
3✔
255

256
                s.running.Store(true)
3✔
257
        })
258
}
259

260
// Stop stops the state machine. This will block until the state machine has
261
// reached a stopping point.
262
func (s *StateMachine[Event, Env]) Stop() {
×
263
        s.stopOnce.Do(func() {
×
264
                close(s.quit)
×
265
                s.gm.Stop()
×
266

×
267
                s.running.Store(false)
×
268
        })
×
269
}
270

271
// SendEvent sends a new event to the state machine.
272
//
273
// TODO(roasbeef): bool if processed?
274
func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
3✔
275
        s.log.Debugf("Sending event %T", event)
3✔
276

3✔
277
        select {
3✔
278
        case s.events <- event:
3✔
279
        case <-ctx.Done():
×
280
                return
×
281
        case <-s.quit:
×
282
                return
×
283
        }
284
}
285

286
// AskEvent sends a new event to the state machine using the Ask pattern
287
// (request-response), waiting for the event to be fully processed. It
288
// returns a Future that will be resolved with the accumulated outbox events
289
// from all state transitions triggered by this event, including nested
290
// internal events. The Future's Await method will return fn.Result[[]Event]
291
// containing either the accumulated outbox events or an error if processing
292
// failed.
293
func (s *StateMachine[Event, Env]) AskEvent(ctx context.Context,
NEW
294
        event Event) actor.Future[[]Event] {
×
NEW
295

×
NEW
296
        s.log.Debugf("Asking event %T", event)
×
NEW
297

×
NEW
298
        // Create a promise to signal completion and return results.
×
NEW
299
        promise := actor.NewPromise[[]Event]()
×
NEW
300

×
NEW
301
        req := syncEventRequest[Event]{
×
NEW
302
                event:   event,
×
NEW
303
                promise: promise,
×
NEW
304
        }
×
NEW
305

×
NEW
306
        // Check for context cancellation or shutdown first to avoid races.
×
NEW
307
        select {
×
NEW
308
        case <-ctx.Done():
×
NEW
309
                promise.Complete(
×
NEW
310
                        fn.Errf[[]Event]("context cancelled: %w",
×
NEW
311
                                ctx.Err()),
×
NEW
312
                )
×
NEW
313

×
NEW
314
                return promise.Future()
×
315

NEW
316
        case <-s.quit:
×
NEW
317
                promise.Complete(fn.Err[[]Event](ErrStateMachineShutdown))
×
NEW
318

×
NEW
319
                return promise.Future()
×
320

NEW
321
        default:
×
322
        }
323

324
        // Send the request to the state machine. If we can't send it due to
325
        // context cancellation or shutdown, complete the promise with an error.
NEW
326
        select {
×
327
        // Successfully sent, the promise will be completed by driveMachine.
NEW
328
        case s.syncEvents <- req:
×
329

NEW
330
        case <-ctx.Done():
×
NEW
331
                promise.Complete(
×
NEW
332
                        fn.Errf[[]Event]("context cancelled: %w",
×
NEW
333
                                ctx.Err()),
×
NEW
334
                )
×
335

NEW
336
        case <-s.quit:
×
NEW
337
                promise.Complete(fn.Err[[]Event](ErrStateMachineShutdown))
×
338
        }
339

NEW
340
        return promise.Future()
×
341
}
342

343
// CanHandle returns true if the target message can be routed to the state
344
// machine.
345
func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool {
3✔
346
        cfgMapper := s.cfg.MsgMapper
3✔
347
        return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool {
6✔
348
                return mapper.MapMsg(msg).IsSome()
3✔
349
        })
3✔
350
}
351

352
// Name returns the name of the state machine's environment.
353
func (s *StateMachine[Event, Env]) Name() string {
3✔
354
        return s.cfg.Env.Name()
3✔
355
}
3✔
356

357
// SendMessage attempts to send a wire message to the state machine. If the
358
// message can be mapped using the default message mapper, then true is
359
// returned indicating that the message was processed. Otherwise, false is
360
// returned.
361
func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context,
362
        msg msgmux.PeerMsg) bool {
3✔
363

3✔
364
        // If we have no message mapper, then return false as we can't process
3✔
365
        // this message.
3✔
366
        if !s.cfg.MsgMapper.IsSome() {
3✔
367
                return false
×
368
        }
×
369

370
        s.log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg))
3✔
371

3✔
372
        // Otherwise, try to map the message using the default message mapper.
3✔
373
        // If we can't extract an event, then we'll return false to indicate
3✔
374
        // that the message wasn't processed.
3✔
375
        var processed bool
3✔
376
        s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) {
6✔
377
                event := mapper.MapMsg(msg)
3✔
378

3✔
379
                event.WhenSome(func(event Event) {
6✔
380
                        s.SendEvent(ctx, event)
3✔
381

3✔
382
                        processed = true
3✔
383
                })
3✔
384
        })
385

386
        return processed
3✔
387
}
388

389
// CurrentState returns the current state of the state machine.
390
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
3✔
391
        query := stateQuery[Event, Env]{
3✔
392
                CurrentState: make(chan State[Event, Env], 1),
3✔
393
        }
3✔
394

3✔
395
        if !fn.SendOrQuit(s.stateQuery, query, s.quit) {
3✔
396
                return nil, ErrStateMachineShutdown
×
397
        }
×
398

399
        return fn.RecvOrTimeout(query.CurrentState, time.Second)
3✔
400
}
401

402
// StateSubscriber represents an active subscription to be notified of new
403
// state transitions.
404
type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
405

406
// RegisterStateEvents registers a new event listener that will be notified of
407
// new state transitions.
408
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[
409
        Event, Env] {
3✔
410

3✔
411
        subscriber := fn.NewEventReceiver[State[Event, Env]](10)
3✔
412

3✔
413
        // TODO(roasbeef): instead give the state and the input event?
3✔
414

3✔
415
        s.newStateEvents.RegisterSubscriber(subscriber)
3✔
416

3✔
417
        return subscriber
3✔
418
}
3✔
419

420
// RemoveStateSub removes the target state subscriber from the set of active
421
// subscribers.
422
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
423
        Event, Env]) {
3✔
424

3✔
425
        _ = s.newStateEvents.RemoveSubscriber(sub)
3✔
426
}
3✔
427

428
// IsRunning returns true if the state machine is currently running.
429
func (s *StateMachine[Event, Env]) IsRunning() bool {
×
430
        return s.running.Load()
×
431
}
×
432

433
// executeDaemonEvent executes a daemon event, which is a special type of event
434
// that can be emitted as part of the state transition function of the state
435
// machine. An error is returned if the type of event is unknown.
436
func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
437
        event DaemonEvent) error {
3✔
438

3✔
439
        switch daemonEvent := event.(type) {
3✔
440
        // This is a send message event, so we'll send the event, and also mind
441
        // any preconditions as well as post-send events.
442
        case *SendMsgEvent[Event]:
3✔
443
                sendAndCleanUp := func() error {
6✔
444
                        s.log.DebugS(ctx, "Sending message:",
3✔
445
                                btclog.Hex6("target", daemonEvent.TargetPeer.SerializeCompressed()),
3✔
446
                                "messages", lnutils.SpewLogClosure(daemonEvent.Msgs))
3✔
447

3✔
448
                        err := s.cfg.Daemon.SendMessages(
3✔
449
                                daemonEvent.TargetPeer, daemonEvent.Msgs,
3✔
450
                        )
3✔
451
                        if err != nil {
3✔
452
                                return fmt.Errorf("unable to send msgs: %w",
×
453
                                        err)
×
454
                        }
×
455

456
                        // If a post-send event was specified, then we'll funnel
457
                        // that back into the main state machine now as well.
458
                        //nolint:ll
459
                        return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error {
6✔
460
                                launched := s.gm.Go(
3✔
461
                                        ctx, func(ctx context.Context) {
6✔
462
                                                s.log.DebugS(ctx, "Sending post-send event",
3✔
463
                                                        "event", lnutils.SpewLogClosure(event))
3✔
464

3✔
465
                                                s.SendEvent(ctx, event)
3✔
466
                                        },
3✔
467
                                )
468

469
                                if !launched {
3✔
470
                                        return ErrStateMachineShutdown
×
471
                                }
×
472

473
                                return nil
3✔
474
                        })
475
                }
476

477
                canSend := func() bool {
6✔
478
                        return fn.MapOptionZ(
3✔
479
                                daemonEvent.SendWhen,
3✔
480
                                func(pred SendPredicate) bool {
6✔
481
                                        return pred()
3✔
482
                                },
3✔
483
                        )
484
                }
485

486
                // If this doesn't have a SendWhen predicate, or if it's already
487
                // true, then we can just send it off right away.
488
                if !daemonEvent.SendWhen.IsSome() || canSend() {
6✔
489
                        return sendAndCleanUp()
3✔
490
                }
3✔
491

492
                // Otherwise, this has a SendWhen predicate, so we'll need
493
                // launch a goroutine to poll the SendWhen, then send only once
494
                // the predicate is true.
495
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
496
                        predicateTicker := time.NewTicker(
×
497
                                s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
×
498
                        )
×
499
                        defer predicateTicker.Stop()
×
500

×
501
                        s.log.InfoS(ctx, "Waiting for send predicate to be true")
×
502

×
503
                        for {
×
504
                                select {
×
505
                                case <-predicateTicker.C:
×
506
                                        if canSend() {
×
507
                                                s.log.InfoS(ctx, "Send active predicate")
×
508

×
509
                                                err := sendAndCleanUp()
×
510
                                                if err != nil {
×
511
                                                        s.log.ErrorS(ctx, "Unable to send message", err)
×
512
                                                }
×
513

514
                                                return
×
515
                                        }
516

517
                                case <-ctx.Done():
×
518
                                        return
×
519
                                }
520
                        }
521
                })
522

523
                if !launched {
×
524
                        return ErrStateMachineShutdown
×
525
                }
×
526

527
                return nil
×
528

529
        // If this is a broadcast transaction event, then we'll broadcast with
530
        // the label attached.
531
        case *BroadcastTxn:
3✔
532
                s.log.DebugS(ctx, "Broadcasting txn",
3✔
533
                        "txid", daemonEvent.Tx.TxHash())
3✔
534

3✔
535
                err := s.cfg.Daemon.BroadcastTransaction(
3✔
536
                        daemonEvent.Tx, daemonEvent.Label,
3✔
537
                )
3✔
538
                if err != nil {
6✔
539
                        log.Errorf("unable to broadcast txn: %v", err)
3✔
540
                }
3✔
541

542
                return nil
3✔
543

544
        // The state machine has requested a new event to be sent once a
545
        // transaction spending a specified outpoint has confirmed.
546
        case *RegisterSpend[Event]:
3✔
547
                s.log.DebugS(ctx, "Registering spend",
3✔
548
                        "outpoint", daemonEvent.OutPoint)
3✔
549

3✔
550
                spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
3✔
551
                        &daemonEvent.OutPoint, daemonEvent.PkScript,
3✔
552
                        daemonEvent.HeightHint,
3✔
553
                )
3✔
554
                if err != nil {
3✔
555
                        return fmt.Errorf("unable to register spend: %w", err)
×
556
                }
×
557

558
                launched := s.gm.Go(ctx, func(ctx context.Context) {
6✔
559
                        for {
6✔
560
                                select {
3✔
561
                                case spend, ok := <-spendEvent.Spend:
3✔
562
                                        if !ok {
6✔
563
                                                return
3✔
564
                                        }
3✔
565

566
                                        // If there's a post-send event, then
567
                                        // we'll send that into the current
568
                                        // state now.
569
                                        postSpend := daemonEvent.PostSpendEvent
3✔
570
                                        postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll
6✔
571
                                                customEvent := f(spend)
3✔
572
                                                s.SendEvent(ctx, customEvent)
3✔
573
                                        })
3✔
574

575
                                        return
3✔
576

577
                                case <-ctx.Done():
×
578
                                        return
×
579
                                }
580
                        }
581
                })
582

583
                if !launched {
3✔
584
                        return ErrStateMachineShutdown
×
585
                }
×
586

587
                return nil
3✔
588

589
        // The state machine has requested a new event to be sent once a
590
        // specified txid+pkScript pair has confirmed.
591
        case *RegisterConf[Event]:
×
592
                s.log.DebugS(ctx, "Registering conf",
×
593
                        "txid", daemonEvent.Txid)
×
594

×
595
                var opts []chainntnfs.NotifierOption
×
596
                if daemonEvent.FullBlock {
×
597
                        opts = append(opts, chainntnfs.WithIncludeBlock())
×
598
                }
×
599

600
                numConfs := daemonEvent.NumConfs.UnwrapOr(1)
×
601
                confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
×
602
                        &daemonEvent.Txid, daemonEvent.PkScript,
×
603
                        numConfs, daemonEvent.HeightHint, opts...,
×
604
                )
×
605
                if err != nil {
×
606
                        return fmt.Errorf("unable to register conf: %w", err)
×
607
                }
×
608

609
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
610
                        for {
×
611
                                select {
×
612
                                //nolint:ll
613
                                case conf, ok := <-confEvent.Confirmed:
×
614
                                        if !ok {
×
615
                                                return
×
616
                                        }
×
617

618
                                        // If there's a post-conf mapper, then
619
                                        // we'll send that into the current
620
                                        // state now.
621
                                        postConfMapper := daemonEvent.PostConfMapper
×
622
                                        postConfMapper.WhenSome(func(f ConfMapper[Event]) {
×
623
                                                customEvent := f(conf)
×
624
                                                s.SendEvent(ctx, customEvent)
×
625
                                        })
×
626

627
                                        return
×
628

629
                                case <-ctx.Done():
×
630
                                        return
×
631
                                }
632
                        }
633
                })
634

635
                if !launched {
×
636
                        return ErrStateMachineShutdown
×
637
                }
×
638

639
                return nil
×
640
        }
641

642
        return fmt.Errorf("unknown daemon event: %T", event)
×
643
}
644

645
// applyEvents applies a new event to the state machine. This will continue
646
// until no further events are emitted by the state machine. Along the way,
647
// we'll also ensure to execute any daemon events that are emitted. The
648
// function returns the final state, any accumulated outbox events, and an
649
// error if one occurred.
650
func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
651
        currentState State[Event, Env], newEvent Event) (State[Event, Env],
652
        []Event, error) {
3✔
653

3✔
654
        eventQueue := fn.NewQueue(newEvent)
3✔
655

3✔
656
        // outbox accumulates all outbox events from state transitions during
3✔
657
        // the entire event processing chain.
3✔
658
        var outbox []Event
3✔
659

3✔
660
        // Given the next event to handle, we'll process the event, then add
3✔
661
        // any new emitted internal events to our event queue. This continues
3✔
662
        // until we reach a terminal state, or we run out of internal events to
3✔
663
        // process.
3✔
664
        //
3✔
665
        //nolint:ll
3✔
666
        for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
6✔
667
                err := fn.MapOptionZ(nextEvent, func(event Event) error {
6✔
668
                        s.log.DebugS(ctx, "Processing event",
3✔
669
                                "event", lnutils.SpewLogClosure(event))
3✔
670

3✔
671
                        // Apply the state transition function of the current
3✔
672
                        // state given this new event and our existing env.
3✔
673
                        transition, err := currentState.ProcessEvent(
3✔
674
                                event, s.cfg.Env,
3✔
675
                        )
3✔
676
                        if err != nil {
3✔
677
                                return err
×
678
                        }
×
679

680
                        newEvents := transition.NewEvents
3✔
681
                        err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error {
6✔
682
                                // With the event processed, we'll process any
3✔
683
                                // new daemon events that were emitted as part
3✔
684
                                // of this new state transition.
3✔
685
                                for _, dEvent := range events.ExternalEvents {
6✔
686
                                        err := s.executeDaemonEvent(
3✔
687
                                                ctx, dEvent,
3✔
688
                                        )
3✔
689
                                        if err != nil {
3✔
690
                                                return err
×
691
                                        }
×
692
                                }
693

694
                                // Next, we'll add any new emitted events to our
695
                                // event queue.
696
                                for _, inEvent := range events.InternalEvent {
6✔
697
                                        s.log.DebugS(ctx, "Adding new internal event to queue",
3✔
698
                                                "event", lnutils.SpewLogClosure(inEvent))
3✔
699

3✔
700
                                        eventQueue.Enqueue(inEvent)
3✔
701
                                }
3✔
702

703
                                // Accumulate any outbox events from this state
704
                                // transition.
705
                                outbox = append(outbox, events.Outbox...)
3✔
706

3✔
707
                                return nil
3✔
708
                        })
709
                        if err != nil {
3✔
710
                                return err
×
711
                        }
×
712

713
                        s.log.InfoS(ctx, "State transition",
3✔
714
                                btclog.Fmt("from_state", "%v", currentState),
3✔
715
                                btclog.Fmt("to_state", "%v", transition.NextState))
3✔
716

3✔
717
                        // With our events processed, we'll now update our
3✔
718
                        // internal state.
3✔
719
                        currentState = transition.NextState
3✔
720

3✔
721
                        // Notify our subscribers of the new state transition.
3✔
722
                        //
3✔
723
                        // TODO(roasbeef): will only give us the outer state?
3✔
724
                        //  * let FSMs choose which state to emit?
3✔
725
                        s.newStateEvents.NotifySubscribers(currentState)
3✔
726

3✔
727
                        return nil
3✔
728
                })
729
                if err != nil {
3✔
NEW
730
                        return currentState, nil, err
×
731
                }
×
732
        }
733

734
        return currentState, outbox, nil
3✔
735
}
736

737
// driveMachine is the main event loop of the state machine. It accepts any new
738
// incoming events, and then drives the state machine forward until it reaches
739
// a terminal state.
740
func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
3✔
741
        s.log.DebugS(ctx, "Starting state machine")
3✔
742

3✔
743
        currentState := s.cfg.InitialState
3✔
744

3✔
745
        // Before we start, if we have an init daemon event specified, then
3✔
746
        // we'll handle that now.
3✔
747
        err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error {
6✔
748
                return s.executeDaemonEvent(ctx, event)
3✔
749
        })
3✔
750
        if err != nil {
3✔
751
                s.log.ErrorS(ctx, "Unable to execute init event", err)
×
752
                return
×
753
        }
×
754

755
        // We just started driving the state machine, so we'll notify our
756
        // subscribers of this starting state.
757
        s.newStateEvents.NotifySubscribers(currentState)
3✔
758

3✔
759
        for {
6✔
760
                select {
3✔
761
                // We have a new external event, so we'll drive the state
762
                // machine forward until we either run out of internal events,
763
                // or we reach a terminal state.
764
                case newEvent := <-s.events:
3✔
765
                        newState, _, err := s.applyEvents(
3✔
766
                                ctx, currentState, newEvent,
3✔
767
                        )
3✔
768
                        if err != nil {
3✔
769
                                s.cfg.ErrorReporter.ReportError(err)
×
770

×
771
                                s.log.ErrorS(ctx, "Unable to apply event", err)
×
772

×
773
                                // An error occurred, so we'll tear down the
×
774
                                // entire state machine as we can't proceed.
×
775
                                go s.Stop()
×
776

×
777
                                return
×
778
                        }
×
779

780
                        currentState = newState
3✔
781

782
                // We have a synchronous event request that expects the
783
                // accumulated outbox events to be returned via the promise.
NEW
784
                case syncReq := <-s.syncEvents:
×
NEW
785
                        newState, outbox, err := s.applyEvents(
×
NEW
786
                                ctx, currentState, syncReq.event,
×
NEW
787
                        )
×
NEW
788
                        if err != nil {
×
NEW
789
                                s.cfg.ErrorReporter.ReportError(err)
×
NEW
790

×
NEW
791
                                s.log.ErrorS(ctx, "Unable to apply sync event",
×
NEW
792
                                        err)
×
NEW
793

×
NEW
794
                                // Complete the promise with the error.
×
NEW
795
                                //
×
NEW
796
                                // TODO(roasbeef): distinguish between error
×
NEW
797
                                // types? state vs processing
×
NEW
798
                                syncReq.promise.Complete(fn.Err[[]Event](err))
×
NEW
799

×
NEW
800
                                // An error occurred, so we'll tear down the
×
NEW
801
                                // entire state machine as we can't proceed.
×
NEW
802
                                go s.Stop()
×
NEW
803

×
NEW
804
                                return
×
NEW
805
                        }
×
806

NEW
807
                        currentState = newState
×
NEW
808

×
NEW
809
                        // Complete the promise with the accumulated outbox
×
NEW
810
                        // events.
×
NEW
811
                        syncReq.promise.Complete(fn.Ok(outbox))
×
812

813
                // An outside caller is querying our state, so we'll return the
814
                // latest state.
815
                case stateQuery := <-s.stateQuery:
3✔
816
                        if !fn.SendOrQuit(
3✔
817
                                stateQuery.CurrentState, currentState, s.quit,
3✔
818
                        ) {
3✔
819

×
820
                                return
×
821
                        }
×
822

823
                case <-s.gm.Done():
×
824
                        return
×
825
                }
826
        }
827
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc