Wrap `std::io::Read` into `futures::AsyncRead`

Hello everyone, I'm trying to wrap R: Read into AsyncReader<R>: futures_io::AsyncRead.

I now recognized what a bad idea it is ...

My final code if you want to have a look (so many lines)
use std::{
    collections::VecDeque,
    io::{self, ErrorKind, Read},
    marker::PhantomData,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};

use futures_io::AsyncRead;
use parking_lot::{Condvar, Mutex};

const READER_BUF_CAPACITY: usize = 4096;

#[derive(Debug, Default)]
struct Shared {
    state: Mutex<SharedState>,
    cvar: Condvar,
}

/// Shared state between the async handle and the background thread.
#[derive(Debug)]
struct SharedState {
    buf: VecDeque<u8>,
    is_eof: bool,
    err: Option<io::Error>,
    waker: Option<Waker>,
}

impl Default for SharedState {
    fn default() -> Self {
        Self {
            buf: VecDeque::with_capacity(READER_BUF_CAPACITY),
            is_eof: false,
            err: None,
            waker: None,
        }
    }
}

/// Async stdin source
#[derive(Clone, Debug)]
pub struct AsyncReader<R> {
    shared: Arc<Shared>,
    _phantom: PhantomData<R>,
}

impl<R> AsyncReader<R> {
    /// Wrap a readable into async readable
    pub fn new(mut reader: R) -> Self
    where
        R: Read + Send + 'static,
    {
        let shared = Arc::new(Shared::default());

        // spawn background thread that reads blocking stdin and pushes into shared.buf
        thread::spawn({
            let shared = shared.clone();
            move || {
                let Shared { state, cvar } = &*shared;
                let mut read_buf = [0u8; 4096];

                loop {
                    if Arc::strong_count(&shared) == 1 {
                        break;
                    }
                    let mut state = state.lock();
                    debug_assert!(
                        state.buf.len() <= READER_BUF_CAPACITY,
                        "buf should never exceed capacity"
                    );

                    match (state.waker.take(), state.buf.len()) {
                        (None, READER_BUF_CAPACITY) => {
                            cvar.wait(&mut state);
                            continue;
                        }
                        (None, _) => {}
                        (Some(waker), 0) => state.waker = Some(waker),
                        (Some(waker), _) => {
                            waker.wake();
                            continue;
                        }
                    }

                    let available = READER_BUF_CAPACITY.min(READER_BUF_CAPACITY - state.buf.len());
                    drop(state);
                    let res = reader.read(&mut read_buf[..available]);
                    let mut state = shared.state.lock();
                    debug_assert!(
                        available <= READER_BUF_CAPACITY.min(READER_BUF_CAPACITY - state.buf.len()),
                        "available space should not decrease"
                    );
                    match res {
                        Ok(n) if n != 0 => {
                            state.buf.extend(&read_buf[..n]);
                            if let Some(waker) = state.waker.take() {
                                waker.wake();
                            }
                        }
                        Err(e)
                            if matches!(
                                e.kind(),
                                ErrorKind::WouldBlock | ErrorKind::Interrupted
                            ) =>
                        {
                            cvar.wait_for(&mut state, Duration::from_secs(1));
                        }
                        res => {
                            match res {
                                Ok(0) => state.is_eof = true,
                                // On other errors, store error and wake;
                                Err(e) => state.err = Some(e),
                                _ => unreachable!(),
                            }
                            if let Some(waker) = state.waker.take() {
                                waker.wake();
                            }
                            break;
                        }
                    }
                }
            }
        });

        AsyncReader {
            shared,
            _phantom: PhantomData,
        }
    }
}

impl<T> AsyncRead for AsyncReader<T>
where
    T: Read + Send + 'static,
{
    /// poll_read reads up to `buf.len()` bytes into `buf`.
    /// Returns Poll::Ready(Ok(n)) with n==0 on EOF.
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        out_buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let Shared { state, cvar } = &*self.shared;

        let mut state = state.lock();

        // if there is data buffered, copy as much as requested and return Ready
        if !state.buf.is_empty() {
            let to_read = out_buf.len().min(state.buf.len());
            let (front, back) = state.buf.as_slices();
            out_buf[..front.len().min(to_read)].copy_from_slice(&front[..front.len().min(to_read)]);
            if front.len() < to_read {
                out_buf[front.len()..to_read].copy_from_slice(&back[..to_read - front.len()]);
            }
            state.buf.drain(..to_read);
            // ask buf
            cvar.notify_one();
            return Poll::Ready(Ok(to_read));
        }

        // handle error
        if let Some(e) = state.err.take() {
            return Poll::Ready(Err(e));
        }

        // buffer empty
        if state.is_eof {
            // EOF and nothing in buffer -> 0 bytes read
            return Poll::Ready(Ok(0));
        }

        // No data and not EOF: register waker and return Pending.
        // Store a clone of the waker so the background thread can wake us.
        if state
            .waker
            .as_ref()
            .is_none_or(|w| !w.will_wake(cx.waker()))
        {
            state.waker = Some(cx.waker().clone());
        }

        cvar.notify_one();

        Poll::Pending
    }
}

impl<R> Drop for AsyncReader<R> {
    fn drop(&mut self) {
        self.shared.cvar.notify_one();
    }
}

There must be many bugs in my code, such as leaving some wakers never waken...

I do this since I just want to avoid tokio::fs::File, and maybe I want to try other reactors in the future.

And I just burn out to also wrap W: Write into AsyncWriter<W>: futures_io::AsyncWrite.


Maybe interacting with the OS is the correct way.

I must stop myself from wasting time, and I highly suspect that I'm in the wrong way.

I wonder:

  1. why my attempt is so painful? Whether it's things like AsFd should be used instead...
  2. any crates can do this wrapping (for futures_io)?
  3. is it true that I have to implement a simple reactor to avoid losing wakers?

Thanks a loooooooooooooooot! :folded_hands:

I think you are looking for blocking

1 Like

Thanks!

blocking uses state machine to make things maintainable, it's worth learning!


I've done and fixed bugs in my original implementation. Finally, I think I should just embrace tokio File...

1 Like