I would appreciate some help from someone who understands async better than I. There are really two different questions in here, but I'll explain the problem first.
My program has a subprocess that I send commands to via stdin
and read the response from stdout
/stderr
. The subprocess writes results to stdout
, but errors are written to stderr
. I'm using tokio::process::Command
to run the subprocess. When I write a command to the subprocess stdin
, I can tell when it finishes processing by looking for 0x07
in the output from stdout
. I won't know if there is an error without just checking stderr
. I think the subprocess writes any errors to stderr
before writing 0x07
to stdout
so I think as long as there's no data to be read from stderr
after I read from stdout
I should be fine. Seemingly every read function I tried from tokio::io::AsyncReadExt
and tokio::io::AsyncBufReadExt
causes my async function to stop progressing if there is no data in stderr
and I await
a read operation. I suspect its related to the fact that the subprocess stays running so stderr
never closes. Also this is on windows using msvc.
My first workaround solution was to use tokio::time::timeout
with a short duration like 1ms and that seemed to work fine, even if it felt a little clunky to me. And presumably because of task scheduling it sometimes waited up to 20ms before giving up.
Question 1: Is there an easy way to just poll the ChildStderr
and return immediately if there is no data?
To try and eliminate that wait time, I tried implementing a future that just called poll_read
directly on ChildStderr
and returned Poll::Ready
whenever I got Poll::Pending
from poll_read
. I came up with the following code that seems to work,
Question 2: Is there anything horribly wrong with this code? I only understand pinning at the most basic level, so other than just trying things until it let me compile, I have no clue if this is valid. As a bonus, any other general guidance on how to implement this is appreciated.
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;
use tokio::process::ChildStderr;
struct ErrFuture<'a> {
stderr: &'a mut ChildStderr,
buffer: Vec<u8>,
}
impl<'a> ErrFuture<'a> {
fn new(stderr: &'a mut ChildStderr) -> Self {
Self {
stderr,
buffer: vec![],
}
}
}
impl<'a> futures::future::Future for ErrFuture<'a> {
type Output = io::Result<Vec<u8>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut buffer = [0; 1024];
match Pin::new(&mut self.stderr).poll_read(cx, &mut buffer) {
Poll::Ready(Ok(n)) => {
log::trace!("Read {} bytes from stderr", n);
self.buffer.extend_from_slice(&buffer[..n]);
cx.waker().wake_by_ref();
return Poll::Pending;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => {}
}
Poll::Ready(Ok(self.buffer.clone()))
}
}