• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.67
/protofsm/state_machine.go
1
package protofsm
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btclog/v2"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/lnutils"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/msgmux"
19
)
20

21
const (
22
        // pollInterval is the interval at which we'll poll the SendWhen
23
        // predicate if specified.
24
        pollInterval = time.Millisecond * 100
25
)
26

27
var (
28
        // ErrStateMachineShutdown occurs when trying to feed an event to a
29
        // StateMachine that has been asked to Stop.
30
        ErrStateMachineShutdown = fmt.Errorf("StateMachine is shutting down")
31
)
32

33
// EmittedEvent is a special type that can be emitted by a state transition.
34
// This can container internal events which are to be routed back to the state,
35
// or external events which are to be sent to the daemon.
36
type EmittedEvent[Event any] struct {
37
        // InternalEvent is an optional internal event that is to be routed
38
        // back to the target state. This enables state to trigger one or many
39
        // state transitions without a new external event.
40
        InternalEvent []Event
41

42
        // ExternalEvent is an optional external event that is to be sent to
43
        // the daemon for dispatch. Usually, this is some form of I/O.
44
        ExternalEvents DaemonEventSet
45
}
46

47
// StateTransition is a state transition type. It denotes the next state to go
48
// to, and also the set of events to emit.
49
type StateTransition[Event any, Env Environment] struct {
50
        // NextState is the next state to transition to.
51
        NextState State[Event, Env]
52

53
        // NewEvents is the set of events to emit.
54
        NewEvents fn.Option[EmittedEvent[Event]]
55
}
56

57
// Environment is an abstract interface that represents the environment that
58
// the state machine will execute using. From the PoV of the main state machine
59
// executor, we just care about being able to clean up any resources that were
60
// allocated by the environment.
61
type Environment interface {
62
        // Name returns the name of the environment. This is used to uniquely
63
        // identify the environment of related state machines.
64
        Name() string
65
}
66

67
// State defines an abstract state along, namely its state transition function
68
// that takes as input an event and an environment, and returns a state
69
// transition (next state, and set of events to emit). As state can also either
70
// be terminal, or not, a terminal event causes state execution to halt.
71
type State[Event any, Env Environment] interface {
72
        // ProcessEvent takes an event and an environment, and returns a new
73
        // state transition. This will be iteratively called until either a
74
        // terminal state is reached, or no further internal events are
75
        // emitted.
76
        ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
77

78
        // IsTerminal returns true if this state is terminal, and false
79
        // otherwise.
80
        IsTerminal() bool
81

82
        // String returns a human readable string that represents the state.
83
        String() string
84
}
85

86
// DaemonAdapters is a set of methods that server as adapters to bridge the
87
// pure world of the FSM to the real world of the daemon. These will be used to
88
// do things like broadcast transactions, or send messages to peers.
89
type DaemonAdapters interface {
90
        // SendMessages sends the target set of messages to the target peer.
91
        SendMessages(btcec.PublicKey, []lnwire.Message) error
92

93
        // BroadcastTransaction broadcasts a transaction with the target label.
94
        BroadcastTransaction(*wire.MsgTx, string) error
95

96
        // RegisterConfirmationsNtfn registers an intent to be notified once
97
        // txid reaches numConfs confirmations. We also pass in the pkScript as
98
        // the default light client instead needs to match on scripts created
99
        // in the block. If a nil txid is passed in, then not only should we
100
        // match on the script, but we should also dispatch once the
101
        // transaction containing the script reaches numConfs confirmations.
102
        // This can be useful in instances where we only know the script in
103
        // advance, but not the transaction containing it.
104
        //
105
        // TODO(roasbeef): could abstract further?
106
        RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte,
107
                numConfs, heightHint uint32,
108
                opts ...chainntnfs.NotifierOption,
109
        ) (*chainntnfs.ConfirmationEvent, error)
110

111
        // RegisterSpendNtfn registers an intent to be notified once the target
112
        // outpoint is successfully spent within a transaction. The script that
113
        // the outpoint creates must also be specified. This allows this
114
        // interface to be implemented by BIP 158-like filtering.
115
        RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte,
116
                heightHint uint32) (*chainntnfs.SpendEvent, error)
117
}
118

119
// stateQuery is used by outside callers to query the internal state of the
120
// state machine.
121
type stateQuery[Event any, Env Environment] struct {
122
        // CurrentState is a channel that will be sent the current state of the
123
        // state machine.
124
        CurrentState chan State[Event, Env]
125
}
126

127
// StateMachine represents an abstract FSM that is able to process new incoming
128
// events and drive a state machine to termination. This implementation uses
129
// type params to abstract over the types of events and environment. Events
130
// trigger new state transitions, that use the environment to perform some
131
// action.
132
//
133
// TODO(roasbeef): terminal check, daemon event execution, init?
134
type StateMachine[Event any, Env Environment] struct {
135
        cfg StateMachineCfg[Event, Env]
136

137
        log btclog.Logger
138

139
        // events is the channel that will be used to send new events to the
140
        // FSM.
141
        events chan Event
142

143
        // newStateEvents is an EventDistributor that will be used to notify
144
        // any relevant callers of new state transitions that occur.
145
        newStateEvents *fn.EventDistributor[State[Event, Env]]
146

147
        // stateQuery is a channel that will be used by outside callers to
148
        // query the internal state machine state.
149
        stateQuery chan stateQuery[Event, Env]
150

151
        gm   fn.GoroutineManager
152
        quit chan struct{}
153

154
        // startOnce and stopOnce are used to ensure that the state machine is
155
        // only started and stopped once.
156
        startOnce sync.Once
157
        stopOnce  sync.Once
158

159
        // running is a flag that indicates if the state machine is currently
160
        // running.
161
        running atomic.Bool
162
}
163

164
// ErrorReporter is an interface that's used to report errors that occur during
165
// state machine execution.
166
type ErrorReporter interface {
167
        // ReportError is a method that's used to report an error that occurred
168
        // during state machine execution.
169
        ReportError(err error)
170
}
171

172
// StateMachineCfg is a configuration struct that's used to create a new state
173
// machine.
174
type StateMachineCfg[Event any, Env Environment] struct {
175
        // ErrorReporter is used to report errors that occur during state
176
        // transitions.
177
        ErrorReporter ErrorReporter
178

179
        // Daemon is a set of adapters that will be used to bridge the FSM to
180
        // the daemon.
181
        Daemon DaemonAdapters
182

183
        // InitialState is the initial state of the state machine.
184
        InitialState State[Event, Env]
185

186
        // Env is the environment that the state machine will use to execute.
187
        Env Env
188

189
        // InitEvent is an optional event that will be sent to the state
190
        // machine as if it was emitted at the onset of the state machine. This
191
        // can be used to set up tracking state such as a txid confirmation
192
        // event.
193
        InitEvent fn.Option[DaemonEvent]
194

195
        // MsgMapper is an optional message mapper that can be used to map
196
        // normal wire messages into FSM events.
197
        MsgMapper fn.Option[MsgMapper[Event]]
198

199
        // CustomPollInterval is an optional custom poll interval that can be
200
        // used to set a quicker interval for tests.
201
        CustomPollInterval fn.Option[time.Duration]
202
}
203

204
// NewStateMachine creates a new state machine given a set of daemon adapters,
205
// an initial state, an environment, and an event to process as if emitted at
206
// the onset of the state machine. Such an event can be used to set up tracking
207
// state such as a txid confirmation event.
208
func NewStateMachine[Event any, Env Environment](
209
        cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] {
3✔
210

3✔
211
        return StateMachine[Event, Env]{
3✔
212
                cfg: cfg,
3✔
213
                log: log.WithPrefix(
3✔
214
                        fmt.Sprintf("FSM(%v):", cfg.Env.Name()),
3✔
215
                ),
3✔
216
                events:         make(chan Event, 1),
3✔
217
                stateQuery:     make(chan stateQuery[Event, Env]),
3✔
218
                gm:             *fn.NewGoroutineManager(),
3✔
219
                newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
3✔
220
                quit:           make(chan struct{}),
3✔
221
        }
3✔
222
}
3✔
223

224
// Start starts the state machine. This will spawn a goroutine that will drive
225
// the state machine to completion.
226
func (s *StateMachine[Event, Env]) Start(ctx context.Context) {
3✔
227
        s.startOnce.Do(func() {
6✔
228
                _ = s.gm.Go(ctx, func(ctx context.Context) {
6✔
229
                        s.driveMachine(ctx)
3✔
230
                })
3✔
231

232
                s.running.Store(true)
3✔
233
        })
234
}
235

236
// Stop stops the state machine. This will block until the state machine has
237
// reached a stopping point.
UNCOV
238
func (s *StateMachine[Event, Env]) Stop() {
×
UNCOV
239
        s.stopOnce.Do(func() {
×
UNCOV
240
                close(s.quit)
×
UNCOV
241
                s.gm.Stop()
×
UNCOV
242

×
UNCOV
243
                s.running.Store(false)
×
UNCOV
244
        })
×
245
}
246

247
// SendEvent sends a new event to the state machine.
248
//
249
// TODO(roasbeef): bool if processed?
250
func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
3✔
251
        s.log.Debugf("Sending event %T", event)
3✔
252

3✔
253
        select {
3✔
254
        case s.events <- event:
3✔
255
        case <-ctx.Done():
×
256
                return
×
257
        case <-s.quit:
×
258
                return
×
259
        }
260
}
261

262
// CanHandle returns true if the target message can be routed to the state
263
// machine.
264
func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool {
3✔
265
        cfgMapper := s.cfg.MsgMapper
3✔
266
        return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool {
6✔
267
                return mapper.MapMsg(msg).IsSome()
3✔
268
        })
3✔
269
}
270

271
// Name returns the name of the state machine's environment.
272
func (s *StateMachine[Event, Env]) Name() string {
3✔
273
        return s.cfg.Env.Name()
3✔
274
}
3✔
275

276
// SendMessage attempts to send a wire message to the state machine. If the
277
// message can be mapped using the default message mapper, then true is
278
// returned indicating that the message was processed. Otherwise, false is
279
// returned.
280
func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context,
281
        msg msgmux.PeerMsg) bool {
3✔
282

3✔
283
        // If we have no message mapper, then return false as we can't process
3✔
284
        // this message.
3✔
285
        if !s.cfg.MsgMapper.IsSome() {
3✔
286
                return false
×
287
        }
×
288

289
        s.log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg))
3✔
290

3✔
291
        // Otherwise, try to map the message using the default message mapper.
3✔
292
        // If we can't extract an event, then we'll return false to indicate
3✔
293
        // that the message wasn't processed.
3✔
294
        var processed bool
3✔
295
        s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) {
6✔
296
                event := mapper.MapMsg(msg)
3✔
297

3✔
298
                event.WhenSome(func(event Event) {
6✔
299
                        s.SendEvent(ctx, event)
3✔
300

3✔
301
                        processed = true
3✔
302
                })
3✔
303
        })
304

305
        return processed
3✔
306
}
307

308
// CurrentState returns the current state of the state machine.
309
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
3✔
310
        query := stateQuery[Event, Env]{
3✔
311
                CurrentState: make(chan State[Event, Env], 1),
3✔
312
        }
3✔
313

3✔
314
        if !fn.SendOrQuit(s.stateQuery, query, s.quit) {
3✔
315
                return nil, ErrStateMachineShutdown
×
316
        }
×
317

318
        return fn.RecvOrTimeout(query.CurrentState, time.Second)
3✔
319
}
320

321
// StateSubscriber represents an active subscription to be notified of new
322
// state transitions.
323
type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
324

325
// RegisterStateEvents registers a new event listener that will be notified of
326
// new state transitions.
327
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[
328
        Event, Env] {
3✔
329

3✔
330
        subscriber := fn.NewEventReceiver[State[Event, Env]](10)
3✔
331

3✔
332
        // TODO(roasbeef): instead give the state and the input event?
3✔
333

3✔
334
        s.newStateEvents.RegisterSubscriber(subscriber)
3✔
335

3✔
336
        return subscriber
3✔
337
}
3✔
338

339
// RemoveStateSub removes the target state subscriber from the set of active
340
// subscribers.
341
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
342
        Event, Env]) {
3✔
343

3✔
344
        _ = s.newStateEvents.RemoveSubscriber(sub)
3✔
345
}
3✔
346

347
// IsRunning returns true if the state machine is currently running.
UNCOV
348
func (s *StateMachine[Event, Env]) IsRunning() bool {
×
UNCOV
349
        return s.running.Load()
×
UNCOV
350
}
×
351

352
// executeDaemonEvent executes a daemon event, which is a special type of event
353
// that can be emitted as part of the state transition function of the state
354
// machine. An error is returned if the type of event is unknown.
355
func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
356
        event DaemonEvent) error {
3✔
357

3✔
358
        switch daemonEvent := event.(type) {
3✔
359
        // This is a send message event, so we'll send the event, and also mind
360
        // any preconditions as well as post-send events.
361
        case *SendMsgEvent[Event]:
3✔
362
                sendAndCleanUp := func() error {
6✔
363
                        s.log.DebugS(ctx, "Sending message:",
3✔
364
                                btclog.Hex6("target", daemonEvent.TargetPeer.SerializeCompressed()),
3✔
365
                                "messages", lnutils.SpewLogClosure(daemonEvent.Msgs))
3✔
366

3✔
367
                        err := s.cfg.Daemon.SendMessages(
3✔
368
                                daemonEvent.TargetPeer, daemonEvent.Msgs,
3✔
369
                        )
3✔
370
                        if err != nil {
3✔
371
                                return fmt.Errorf("unable to send msgs: %w",
×
372
                                        err)
×
373
                        }
×
374

375
                        // If a post-send event was specified, then we'll funnel
376
                        // that back into the main state machine now as well.
377
                        return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
6✔
378
                                launched := s.gm.Go(
3✔
379
                                        ctx, func(ctx context.Context) {
6✔
380
                                                s.log.DebugS(ctx, "Sending post-send event",
3✔
381
                                                        "event", lnutils.SpewLogClosure(event))
3✔
382

3✔
383
                                                s.SendEvent(ctx, event)
3✔
384
                                        },
3✔
385
                                )
386

387
                                if !launched {
3✔
388
                                        return ErrStateMachineShutdown
×
389
                                }
×
390

391
                                return nil
3✔
392
                        })
393
                }
394

395
                canSend := func() bool {
6✔
396
                        return fn.MapOptionZ(
3✔
397
                                daemonEvent.SendWhen,
3✔
398
                                func(pred SendPredicate) bool {
6✔
399
                                        return pred()
3✔
400
                                },
3✔
401
                        )
402
                }
403

404
                // If this doesn't have a SendWhen predicate, or if it's already
405
                // true, then we can just send it off right away.
406
                if !daemonEvent.SendWhen.IsSome() || canSend() {
6✔
407
                        return sendAndCleanUp()
3✔
408
                }
3✔
409

410
                // Otherwise, this has a SendWhen predicate, so we'll need
411
                // launch a goroutine to poll the SendWhen, then send only once
412
                // the predicate is true.
UNCOV
413
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
UNCOV
414
                        predicateTicker := time.NewTicker(
×
UNCOV
415
                                s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
×
UNCOV
416
                        )
×
UNCOV
417
                        defer predicateTicker.Stop()
×
UNCOV
418

×
UNCOV
419
                        s.log.InfoS(ctx, "Waiting for send predicate to be true")
×
UNCOV
420

×
UNCOV
421
                        for {
×
UNCOV
422
                                select {
×
UNCOV
423
                                case <-predicateTicker.C:
×
UNCOV
424
                                        if canSend() {
×
UNCOV
425
                                                s.log.InfoS(ctx, "Send active predicate")
×
UNCOV
426

×
UNCOV
427
                                                err := sendAndCleanUp()
×
UNCOV
428
                                                if err != nil {
×
429
                                                        s.log.ErrorS(ctx, "Unable to send message", err)
×
430
                                                }
×
431

UNCOV
432
                                                return
×
433
                                        }
434

UNCOV
435
                                case <-ctx.Done():
×
UNCOV
436
                                        return
×
437
                                }
438
                        }
439
                })
440

UNCOV
441
                if !launched {
×
442
                        return ErrStateMachineShutdown
×
443
                }
×
444

UNCOV
445
                return nil
×
446

447
        // If this is a broadcast transaction event, then we'll broadcast with
448
        // the label attached.
449
        case *BroadcastTxn:
3✔
450
                s.log.DebugS(ctx, "Broadcasting txn",
3✔
451
                        "txid", daemonEvent.Tx.TxHash())
3✔
452

3✔
453
                err := s.cfg.Daemon.BroadcastTransaction(
3✔
454
                        daemonEvent.Tx, daemonEvent.Label,
3✔
455
                )
3✔
456
                if err != nil {
6✔
457
                        log.Errorf("unable to broadcast txn: %v", err)
3✔
458
                }
3✔
459

460
                return nil
3✔
461

462
        // The state machine has requested a new event to be sent once a
463
        // transaction spending a specified outpoint has confirmed.
464
        case *RegisterSpend[Event]:
3✔
465
                s.log.DebugS(ctx, "Registering spend",
3✔
466
                        "outpoint", daemonEvent.OutPoint)
3✔
467

3✔
468
                spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
3✔
469
                        &daemonEvent.OutPoint, daemonEvent.PkScript,
3✔
470
                        daemonEvent.HeightHint,
3✔
471
                )
3✔
472
                if err != nil {
3✔
473
                        return fmt.Errorf("unable to register spend: %w", err)
×
474
                }
×
475

476
                launched := s.gm.Go(ctx, func(ctx context.Context) {
6✔
477
                        for {
6✔
478
                                select {
3✔
479
                                case spend, ok := <-spendEvent.Spend:
3✔
480
                                        if !ok {
6✔
481
                                                return
3✔
482
                                        }
3✔
483

484
                                        // If there's a post-send event, then
485
                                        // we'll send that into the current
486
                                        // state now.
487
                                        postSpend := daemonEvent.PostSpendEvent
3✔
488
                                        postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll
6✔
489
                                                customEvent := f(spend)
3✔
490
                                                s.SendEvent(ctx, customEvent)
3✔
491
                                        })
3✔
492

493
                                        return
3✔
494

UNCOV
495
                                case <-ctx.Done():
×
UNCOV
496
                                        return
×
497
                                }
498
                        }
499
                })
500

501
                if !launched {
3✔
502
                        return ErrStateMachineShutdown
×
503
                }
×
504

505
                return nil
3✔
506

507
        // The state machine has requested a new event to be sent once a
508
        // specified txid+pkScript pair has confirmed.
UNCOV
509
        case *RegisterConf[Event]:
×
UNCOV
510
                s.log.DebugS(ctx, "Registering conf",
×
UNCOV
511
                        "txid", daemonEvent.Txid)
×
UNCOV
512

×
UNCOV
513
                var opts []chainntnfs.NotifierOption
×
UNCOV
514
                if daemonEvent.FullBlock {
×
UNCOV
515
                        opts = append(opts, chainntnfs.WithIncludeBlock())
×
UNCOV
516
                }
×
517

UNCOV
518
                numConfs := daemonEvent.NumConfs.UnwrapOr(1)
×
UNCOV
519
                confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
×
UNCOV
520
                        &daemonEvent.Txid, daemonEvent.PkScript,
×
UNCOV
521
                        numConfs, daemonEvent.HeightHint, opts...,
×
UNCOV
522
                )
×
UNCOV
523
                if err != nil {
×
524
                        return fmt.Errorf("unable to register conf: %w", err)
×
525
                }
×
526

UNCOV
527
                launched := s.gm.Go(ctx, func(ctx context.Context) {
×
UNCOV
528
                        for {
×
UNCOV
529
                                select {
×
530
                                //nolint:ll
UNCOV
531
                                case conf, ok := <-confEvent.Confirmed:
×
UNCOV
532
                                        if !ok {
×
533
                                                return
×
534
                                        }
×
535

536
                                        // If there's a post-conf mapper, then
537
                                        // we'll send that into the current
538
                                        // state now.
UNCOV
539
                                        postConfMapper := daemonEvent.PostConfMapper
×
UNCOV
540
                                        postConfMapper.WhenSome(func(f ConfMapper[Event]) {
×
UNCOV
541
                                                customEvent := f(conf)
×
UNCOV
542
                                                s.SendEvent(ctx, customEvent)
×
UNCOV
543
                                        })
×
544

UNCOV
545
                                        return
×
546

547
                                case <-ctx.Done():
×
548
                                        return
×
549
                                }
550
                        }
551
                })
552

UNCOV
553
                if !launched {
×
554
                        return ErrStateMachineShutdown
×
555
                }
×
556

UNCOV
557
                return nil
×
558
        }
559

560
        return fmt.Errorf("unknown daemon event: %T", event)
×
561
}
562

563
// applyEvents applies a new event to the state machine. This will continue
564
// until no further events are emitted by the state machine. Along the way,
565
// we'll also ensure to execute any daemon events that are emitted.
566
func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
567
        currentState State[Event, Env], newEvent Event) (State[Event, Env],
568
        error) {
3✔
569

3✔
570
        eventQueue := fn.NewQueue(newEvent)
3✔
571

3✔
572
        // Given the next event to handle, we'll process the event, then add
3✔
573
        // any new emitted internal events to our event queue. This continues
3✔
574
        // until we reach a terminal state, or we run out of internal events to
3✔
575
        // process.
3✔
576
        //
3✔
577
        //nolint:ll
3✔
578
        for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
6✔
579
                err := fn.MapOptionZ(nextEvent, func(event Event) error {
6✔
580
                        s.log.DebugS(ctx, "Processing event",
3✔
581
                                "event", lnutils.SpewLogClosure(event))
3✔
582

3✔
583
                        // Apply the state transition function of the current
3✔
584
                        // state given this new event and our existing env.
3✔
585
                        transition, err := currentState.ProcessEvent(
3✔
586
                                event, s.cfg.Env,
3✔
587
                        )
3✔
588
                        if err != nil {
3✔
UNCOV
589
                                return err
×
UNCOV
590
                        }
×
591

592
                        newEvents := transition.NewEvents
3✔
593
                        err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { //nolint:ll
6✔
594
                                // With the event processed, we'll process any
3✔
595
                                // new daemon events that were emitted as part
3✔
596
                                // of this new state transition.
3✔
597
                                for _, dEvent := range events.ExternalEvents {
6✔
598
                                        err := s.executeDaemonEvent(
3✔
599
                                                ctx, dEvent,
3✔
600
                                        )
3✔
601
                                        if err != nil {
3✔
602
                                                return err
×
603
                                        }
×
604
                                }
605

606
                                // Next, we'll add any new emitted events to our
607
                                // event queue.
608
                                //
609
                                //nolint:ll
610
                                for _, inEvent := range events.InternalEvent {
6✔
611
                                        s.log.DebugS(ctx, "Adding new internal event to queue",
3✔
612
                                                "event", lnutils.SpewLogClosure(inEvent))
3✔
613

3✔
614
                                        eventQueue.Enqueue(inEvent)
3✔
615
                                }
3✔
616

617
                                return nil
3✔
618
                        })
619
                        if err != nil {
3✔
620
                                return err
×
621
                        }
×
622

623
                        s.log.InfoS(ctx, "State transition",
3✔
624
                                btclog.Fmt("from_state", "%v", currentState),
3✔
625
                                btclog.Fmt("to_state", "%v", transition.NextState))
3✔
626

3✔
627
                        // With our events processed, we'll now update our
3✔
628
                        // internal state.
3✔
629
                        currentState = transition.NextState
3✔
630

3✔
631
                        // Notify our subscribers of the new state transition.
3✔
632
                        //
3✔
633
                        // TODO(roasbeef): will only give us the outer state?
3✔
634
                        //  * let FSMs choose which state to emit?
3✔
635
                        s.newStateEvents.NotifySubscribers(currentState)
3✔
636

3✔
637
                        return nil
3✔
638
                })
639
                if err != nil {
3✔
UNCOV
640
                        return currentState, err
×
UNCOV
641
                }
×
642
        }
643

644
        return currentState, nil
3✔
645
}
646

647
// driveMachine is the main event loop of the state machine. It accepts any new
648
// incoming events, and then drives the state machine forward until it reaches
649
// a terminal state.
650
func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
3✔
651
        s.log.DebugS(ctx, "Starting state machine")
3✔
652

3✔
653
        currentState := s.cfg.InitialState
3✔
654

3✔
655
        // Before we start, if we have an init daemon event specified, then
3✔
656
        // we'll handle that now.
3✔
657
        err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error {
6✔
658
                return s.executeDaemonEvent(ctx, event)
3✔
659
        })
3✔
660
        if err != nil {
3✔
661
                s.log.ErrorS(ctx, "Unable to execute init event", err)
×
662
                return
×
663
        }
×
664

665
        // We just started driving the state machine, so we'll notify our
666
        // subscribers of this starting state.
667
        s.newStateEvents.NotifySubscribers(currentState)
3✔
668

3✔
669
        for {
6✔
670
                select {
3✔
671
                // We have a new external event, so we'll drive the state
672
                // machine forward until we either run out of internal events,
673
                // or we reach a terminal state.
674
                case newEvent := <-s.events:
3✔
675
                        newState, err := s.applyEvents(
3✔
676
                                ctx, currentState, newEvent,
3✔
677
                        )
3✔
678
                        if err != nil {
3✔
UNCOV
679
                                s.cfg.ErrorReporter.ReportError(err)
×
UNCOV
680

×
UNCOV
681
                                s.log.ErrorS(ctx, "Unable to apply event", err)
×
UNCOV
682

×
UNCOV
683
                                // An error occurred, so we'll tear down the
×
UNCOV
684
                                // entire state machine as we can't proceed.
×
UNCOV
685
                                go s.Stop()
×
UNCOV
686

×
UNCOV
687
                                return
×
UNCOV
688
                        }
×
689

690
                        currentState = newState
3✔
691

692
                // An outside caller is querying our state, so we'll return the
693
                // latest state.
694
                case stateQuery := <-s.stateQuery:
3✔
695
                        if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { //nolint:ll
3✔
696
                                return
×
697
                        }
×
698

UNCOV
699
                case <-s.gm.Done():
×
UNCOV
700
                        return
×
701
                }
702
        }
703
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc