I am trying to create an async runtime. Currently trying to test and see if reactor is working correctly. This is the test code:
use std::{io::{Read, Write}, os::unix::net::UnixStream, pin::Pin, task::{Context, Poll}, thread};
use crate::lib::reactor::IoEventType;
use super::{Executor, Task};
enum State {
Step1,
Step2,
Step3,
Step4,
Done,
}
// TODO: Enforce that a compatible future must hold a key
pub struct MultiStep {
fd: UnixStream,
state: State,
key: Option<usize>
}
impl MultiStep {
pub fn new(fd: UnixStream) -> Self {
Self {
fd,
state: State::Step1,
key: None
}
}
}
impl Future for MultiStep {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use State::*;
match self.state {
Step1 => {
println!("Step 1 in progress...");
self.state = Step2;
cx.waker().wake_by_ref(); // Simulate readiness for next step
Poll::Pending
}
Step2 => {
println!("Step 2 in progress...");
self.state = Step3;
cx.waker().wake_by_ref(); // Simulate readiness again
Poll::Pending
}
Step3 => {
println!("Step 3 in progress...");
self.state = Step4;
let task: *const Task = cx.waker().data().cast();
if !task.is_aligned(){
panic!("Task is not aligned");
}
// Register Read event in reactor
unsafe {
let executor = (*task).executor.clone();
self.key = Some(executor.reactor.register(
&self.fd,
IoEventType::Readable,
cx.waker().clone()
).unwrap());
}
Poll::Pending
}
Step4 => {
println!("Step 4 in progress...");
// Simulate a read operation
let mut buf = [0; 1024];
self.fd.read_exact(&mut buf).unwrap();
println!("Read data: {}", String::from_utf8_lossy(&buf));
self.state = Done;
let task: *const Task = cx.waker().data().cast();
if !task.is_aligned(){
panic!("Task is not aligned");
}
// Unregister the event
unsafe {
let executor = (*task).executor.clone();
executor.reactor.unregister(
self.key.unwrap(),
&self.fd
).unwrap();
}
cx.waker().wake_by_ref(); // Simulate readiness again
Poll::Pending
}
Done => {
println!("All steps complete!");
Poll::Ready(())
}
}
}
}
#[test]
fn test_executor() {
// Create a UnixStream for testing
let (mut sender, receiver) = UnixStream::pair().unwrap();
println!("Sender: {:?}", sender.local_addr().unwrap());
println!("Receiver: {:?}", receiver.local_addr().unwrap());
let future = Box::pin(MultiStep::new(receiver));
let handle = thread::spawn(|| {
Executor::init(future);
});
thread::sleep(std::time::Duration::from_secs(1));
sender.write_all(b"Hello, world!").unwrap();
println!("Data sent!");
handle.join().unwrap();
}
when running the test with cargo test -- --nocapture
, I am getting output till "Step 4 is in progress" but it is blocking indefinitely and not reading data at self.fd.read_exact(...)
. This is the wait and wake up method in the reactor:
pub fn wait_and_wake(&self) -> Result<(), std::io::Error> {
let mut events = Events::new();
// TODO: Need to look into if timeout is required
self.poller.wait(&mut events, None)?;
let mut event_map = self.event_map.lock().unwrap();
for event in events.iter() {
if let Some(waker) = event_map.get(&event.key) {
waker.wake_by_ref();
event_map.remove(&event.key);
}
}
Ok(())
}
I have tried sending data multiple times in the test function by just duplicating the line
sender.write_all(b"Hello, world!").unwrap();
multiple times so that it executes after we enter the step 4 block. Yet, it is still blocked indefinitely. What is causing this issue and how do I fix it?
link to repo: GitHub - MercMayhem/rust-async-runtime at testing