HI, I want to create a state machine for consensus in distributed systems, is there any abstractions like it is in Elixir genstatemachine.
For the very specific use case of creating a state machine compatible with a distributed consensus and/or replication algorithm like Raft, I would recommend thinking about starting small and simple instead of looking into creating a "complex" state machine right off-the-bat.
Over the last few weekends I've been working on a Raft implementation and I've found my design requirements for the state machine have evolved quite a bit over the entire development period. It started off with a partial implementation of a Key/Value store backed by a Arc<Mutex<HashMap<String, String>>>
, and eventually landed on something significantly less concrete.
If the focus of your work is Raft (or Paxos or some other consensus/replication algorithm), then it is probably reasonable to keep the neighbouring systems as lightweight as possible while focusing on what the boundary (i.e. the state machine API) looks like and the minimum state required to exhaust all the possible interactions at the boundaries.
As far as Raft (i.e. the consensus and replication layer) is concerned, the only responsibility a state machine has is that of gobbling up a log message whenever it is signalled that it is safe to do so. Hence exposing a simple "apply" API is a reasonable start.
pub trait StateMachine {
// Accept a single log message and apply it to the underlying
// state machine.
fn apply(&self, log: &[u8]) -> Result<(), Box<dyn Error>>;
}
Furthermore, you could have a naive concrete implementation backed by a Arc<Mutex<Vec<Vec<u8>>>>
that just appends the log to the end of the vector.
#[derive(Default)]
pub struct VecStateMachine {
// The ArcMutex provides shared mutability
// since we have an immutable reference in our StateMachine trait.
inner: Arc<Mutex<Vec<Vec<u8>>>>
}
impl StateMachine for VecStateMachine {
fn apply(&self, log: &[u8]) -> Result<(), Box<dyn Error>> {
self.inner.lock()?.push(log.to_vec());
Ok(())
}
}
Having a state accessor for inspection can also prove useful for debugging purposes.
impl StateMachine {
pub fn state(&self) -> Vec<Vec<u8>> {
self.inner.lock().unwrap().to_vec()
}
}
Assuming a message-passing Raft runtime, we can propagate the log messages from our connections (to a server) into the Raft layer and when the replication is complete, Raft proposes the state machine backend (i.e. VecStateMachine
) to apply the new log entry via a (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>)
.
And that's about as complicated as it gets when it comes to creating a basic state machine that just works with a consensus/replication algorithm.
On the other hand, if the focus of your work is to create or model state machines independent of the consensus/replication algorithm then there's a couple of interesting crates you might find useful: rust-fsm, bicoro.
Thank you for taking time to answer.
I want to build a mealy machine as it covers most of the fsm, but also include event driven changes.
as its custom consensus , but there is no easy existing abstraction.
What sort of abstractions are you looking for? Maybe if you describe the code you would like to write, we can suggest something that might be close.
Any event driven change to the State Machine will likely be facilitated by a communication channel (i.e. mpsc::Receiver
) so you may be able to wrap all the possible interactions in a fat channel struct
pub struct StateMachineMutator {
replication_complete_rx: mpsc::Receiver<Vec<u8>>,
make_a_specific_update_to_state_rx: mpsc::Receiver<SomeContext>,
...
}
or perhaps a receiver of multiple message variants:
pub enum Message {
ReplicationComplete,
UpdateSomeSpecificState(SomeContext),
....
}
pub struct StateMachineMutator {
on_message_received: mpsc::Receiver<Message>,
}
impl<M> StateMachineMutator
where
M: StateMachine + Clone
{
pub fn handle_message(&self, msg: Message, state_machine: M) -> Result<(), Box<dyn Error>>{
match msg {
Message::ReplicationComplete => {
// do something with your state machine.
},
Message::UpdateSomeSpecificState(context) => {
// update your state here.
}
}
Ok(())
}
pub async fn run(mut self, state_machine: M) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let message_rx = self.on_message_received;
while let Some(msg) = message_rx.recv() {
self.handle_message(msg, state_machine.clone())?;
}
}
}
This will allow you to represent any complex API your state machine provides as messages that your state machine understands and reacts in some way without actually describing what that way is.
Then in your driver function you can pass the sender of a (mpsc::Sender<Message>
, mpsc::Receiver<Message>
) pair to your consensus runtime, and the receiver to this mutator struct. Finally, spawn a non-blocking run()
by providing something that is impl StateMachine + Clone
.
Note: This is just a sketch for what the interaction could look like and I'm no rustc
so you may want to refactor it a bit if it doesn't work as is.
Also, please feel free to provide more information if you think I'm misinterpreting your question in some way.
This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.