Hello everyone, I'm trying to wrap R: Read into AsyncReader<R>: futures_io::AsyncRead.
I now recognized what a bad idea it is ...
My final code if you want to have a look (so many lines)
use std::{
collections::VecDeque,
io::{self, ErrorKind, Read},
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
thread,
time::Duration,
};
use futures_io::AsyncRead;
use parking_lot::{Condvar, Mutex};
const READER_BUF_CAPACITY: usize = 4096;
#[derive(Debug, Default)]
struct Shared {
state: Mutex<SharedState>,
cvar: Condvar,
}
/// Shared state between the async handle and the background thread.
#[derive(Debug)]
struct SharedState {
buf: VecDeque<u8>,
is_eof: bool,
err: Option<io::Error>,
waker: Option<Waker>,
}
impl Default for SharedState {
fn default() -> Self {
Self {
buf: VecDeque::with_capacity(READER_BUF_CAPACITY),
is_eof: false,
err: None,
waker: None,
}
}
}
/// Async stdin source
#[derive(Clone, Debug)]
pub struct AsyncReader<R> {
shared: Arc<Shared>,
_phantom: PhantomData<R>,
}
impl<R> AsyncReader<R> {
/// Wrap a readable into async readable
pub fn new(mut reader: R) -> Self
where
R: Read + Send + 'static,
{
let shared = Arc::new(Shared::default());
// spawn background thread that reads blocking stdin and pushes into shared.buf
thread::spawn({
let shared = shared.clone();
move || {
let Shared { state, cvar } = &*shared;
let mut read_buf = [0u8; 4096];
loop {
if Arc::strong_count(&shared) == 1 {
break;
}
let mut state = state.lock();
debug_assert!(
state.buf.len() <= READER_BUF_CAPACITY,
"buf should never exceed capacity"
);
match (state.waker.take(), state.buf.len()) {
(None, READER_BUF_CAPACITY) => {
cvar.wait(&mut state);
continue;
}
(None, _) => {}
(Some(waker), 0) => state.waker = Some(waker),
(Some(waker), _) => {
waker.wake();
continue;
}
}
let available = READER_BUF_CAPACITY.min(READER_BUF_CAPACITY - state.buf.len());
drop(state);
let res = reader.read(&mut read_buf[..available]);
let mut state = shared.state.lock();
debug_assert!(
available <= READER_BUF_CAPACITY.min(READER_BUF_CAPACITY - state.buf.len()),
"available space should not decrease"
);
match res {
Ok(n) if n != 0 => {
state.buf.extend(&read_buf[..n]);
if let Some(waker) = state.waker.take() {
waker.wake();
}
}
Err(e)
if matches!(
e.kind(),
ErrorKind::WouldBlock | ErrorKind::Interrupted
) =>
{
cvar.wait_for(&mut state, Duration::from_secs(1));
}
res => {
match res {
Ok(0) => state.is_eof = true,
// On other errors, store error and wake;
Err(e) => state.err = Some(e),
_ => unreachable!(),
}
if let Some(waker) = state.waker.take() {
waker.wake();
}
break;
}
}
}
}
});
AsyncReader {
shared,
_phantom: PhantomData,
}
}
}
impl<T> AsyncRead for AsyncReader<T>
where
T: Read + Send + 'static,
{
/// poll_read reads up to `buf.len()` bytes into `buf`.
/// Returns Poll::Ready(Ok(n)) with n==0 on EOF.
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
out_buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let Shared { state, cvar } = &*self.shared;
let mut state = state.lock();
// if there is data buffered, copy as much as requested and return Ready
if !state.buf.is_empty() {
let to_read = out_buf.len().min(state.buf.len());
let (front, back) = state.buf.as_slices();
out_buf[..front.len().min(to_read)].copy_from_slice(&front[..front.len().min(to_read)]);
if front.len() < to_read {
out_buf[front.len()..to_read].copy_from_slice(&back[..to_read - front.len()]);
}
state.buf.drain(..to_read);
// ask buf
cvar.notify_one();
return Poll::Ready(Ok(to_read));
}
// handle error
if let Some(e) = state.err.take() {
return Poll::Ready(Err(e));
}
// buffer empty
if state.is_eof {
// EOF and nothing in buffer -> 0 bytes read
return Poll::Ready(Ok(0));
}
// No data and not EOF: register waker and return Pending.
// Store a clone of the waker so the background thread can wake us.
if state
.waker
.as_ref()
.is_none_or(|w| !w.will_wake(cx.waker()))
{
state.waker = Some(cx.waker().clone());
}
cvar.notify_one();
Poll::Pending
}
}
impl<R> Drop for AsyncReader<R> {
fn drop(&mut self) {
self.shared.cvar.notify_one();
}
}
There must be many bugs in my code, such as leaving some wakers never waken...
I do this since I just want to avoid tokio::fs::File, and maybe I want to try other reactors in the future.
And I just burn out to also wrap W: Write into AsyncWriter<W>: futures_io::AsyncWrite.
Maybe interacting with the OS is the correct way.
I must stop myself from wasting time, and I highly suspect that I'm in the wrong way.
I wonder:
- why my attempt is so painful? Whether it's things like
AsFdshould be used instead... - any crates can do this wrapping (for
futures_io)? - is it true that I have to implement a simple reactor to avoid losing wakers?
Thanks a loooooooooooooooot! ![]()