UnixStream blocking on read even when messages present in socket

I am trying to create an async runtime. Currently trying to test and see if reactor is working correctly. This is the test code:

    use std::{io::{Read, Write}, os::unix::net::UnixStream, pin::Pin, task::{Context, Poll}, thread};

    use crate::lib::reactor::IoEventType;

    use super::{Executor, Task};

    enum State {
        Step1,
        Step2,
        Step3,
        Step4,
        Done,
    }

    // TODO: Enforce that a compatible future must hold a key
    pub struct MultiStep {
        fd: UnixStream,
        state: State,
        key: Option<usize>
    }

    impl MultiStep {
        pub fn new(fd: UnixStream) -> Self {
            Self {
                fd,
                state: State::Step1,
                key: None
            }
        }
    }

    impl Future for MultiStep {
        type Output = ();

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            use State::*;

            match self.state {
                Step1 => {
                    println!("Step 1 in progress...");
                    self.state = Step2;
                    cx.waker().wake_by_ref();  // Simulate readiness for next step
                    Poll::Pending
                }
                Step2 => {
                    println!("Step 2 in progress...");
                    self.state = Step3;
                    cx.waker().wake_by_ref();  // Simulate readiness again
                    Poll::Pending
                }
                Step3 => {
                    println!("Step 3 in progress...");
                    self.state = Step4;
                    let task: *const Task = cx.waker().data().cast();
                    if !task.is_aligned(){
                        panic!("Task is not aligned");
                    }

                    // Register Read event in reactor
                    unsafe {
                        let executor = (*task).executor.clone();
                        self.key = Some(executor.reactor.register(
                            &self.fd,
                            IoEventType::Readable,
                            cx.waker().clone()
                        ).unwrap());
                    }

                    Poll::Pending
                }
                Step4 => {
                    println!("Step 4 in progress...");
                    // Simulate a read operation
                    let mut buf = [0; 1024];
                    self.fd.read_exact(&mut buf).unwrap();

                    println!("Read data: {}", String::from_utf8_lossy(&buf));
                    self.state = Done;

                    let task: *const Task = cx.waker().data().cast();
                    if !task.is_aligned(){
                        panic!("Task is not aligned");
                    }
                    // Unregister the event
                    unsafe {
                        let executor = (*task).executor.clone();
                        executor.reactor.unregister(
                            self.key.unwrap(),
                            &self.fd
                        ).unwrap();
                    }

                    cx.waker().wake_by_ref();  // Simulate readiness again
                    Poll::Pending
                }
                Done => {
                    println!("All steps complete!");
                    Poll::Ready(())
                }
            }
        }
    }

    #[test]
    fn test_executor() {
        // Create a UnixStream for testing
        let (mut sender, receiver) = UnixStream::pair().unwrap();

        println!("Sender: {:?}", sender.local_addr().unwrap());
        println!("Receiver: {:?}", receiver.local_addr().unwrap());
        
        let future = Box::pin(MultiStep::new(receiver));
        let handle = thread::spawn(|| {
            Executor::init(future);
        });

        thread::sleep(std::time::Duration::from_secs(1));
        sender.write_all(b"Hello, world!").unwrap();
        println!("Data sent!");
        handle.join().unwrap();
    } 

when running the test with cargo test -- --nocapture, I am getting output till "Step 4 is in progress" but it is blocking indefinitely and not reading data at self.fd.read_exact(...). This is the wait and wake up method in the reactor:

    pub fn wait_and_wake(&self) -> Result<(), std::io::Error> {
        let mut events = Events::new();
        // TODO: Need to look into if timeout is required
        self.poller.wait(&mut events, None)?;

        let mut event_map = self.event_map.lock().unwrap();
        for event in events.iter() {
            if let Some(waker) = event_map.get(&event.key) {
                waker.wake_by_ref();
                event_map.remove(&event.key);
            }
        }

        Ok(())
    }

I have tried sending data multiple times in the test function by just duplicating the line

sender.write_all(b"Hello, world!").unwrap();

multiple times so that it executes after we enter the step 4 block. Yet, it is still blocked indefinitely. What is causing this issue and how do I fix it?

link to repo: GitHub - MercMayhem/rust-async-runtime at testing

You must not use Read::read_exact in code that utilizes non-blocking sockets. Instead, you should:

  1. Call Read::read to read some number of bytes.
  2. If you have all the bytes you need, exit.
  3. Otherwise, you register for readiness and wait.
  4. Once you received readiness again you go back to step 1.

The problem with read_exact is that has a loop internally around Read::read, but that loop skips step 3 and just keeps trying to read until the full amount of bytes are available.

2 Likes

This isn’t the bug you asked for help with, but, here you are assuming that the provided waker is one of your wakers. It is unsound to do this. You could make this sound with a “type” check by comparing the waker’s vtable, but that is still not ideal — some future combinators may insert their own waker wrappers to know which child future woke. The most robust solution is to remember the reactor in the future, and treat information you can get from the waker as an optional optimization only.

1 Like

Hi alice. I liked reading your blogs :slight_smile: . I should have read the method's description in the documentation more carefully lmao.

Hi, kpreid. I did have struggle quite a bit trying to figure out how to pass the reactor to a struct that implements future. Could you share a code snippet of how I can initialize a struct with the reactor as a field in it?

I'm not sure I understand this point completely. Could you provide an example for future combinators inserting their own waker wrappers and example code for the type check?

Tokio passes the context via a thread-local rather than the Waker. There's no good way to pass it through the waker.

1 Like