Reading from a Tokio Async Subprocess blocks

I would appreciate some help from someone who understands async better than I. There are really two different questions in here, but I'll explain the problem first.

My program has a subprocess that I send commands to via stdin and read the response from stdout/stderr. The subprocess writes results to stdout, but errors are written to stderr. I'm using tokio::process::Command to run the subprocess. When I write a command to the subprocess stdin, I can tell when it finishes processing by looking for 0x07 in the output from stdout. I won't know if there is an error without just checking stderr. I think the subprocess writes any errors to stderr before writing 0x07 to stdout so I think as long as there's no data to be read from stderr after I read from stdout I should be fine. Seemingly every read function I tried from tokio::io::AsyncReadExt and tokio::io::AsyncBufReadExt causes my async function to stop progressing if there is no data in stderr and I await a read operation. I suspect its related to the fact that the subprocess stays running so stderr never closes. Also this is on windows using msvc.

My first workaround solution was to use tokio::time::timeout with a short duration like 1ms and that seemed to work fine, even if it felt a little clunky to me. And presumably because of task scheduling it sometimes waited up to 20ms before giving up.

Question 1: Is there an easy way to just poll the ChildStderr and return immediately if there is no data?

To try and eliminate that wait time, I tried implementing a future that just called poll_read directly on ChildStderr and returned Poll::Ready whenever I got Poll::Pending from poll_read. I came up with the following code that seems to work,

Question 2: Is there anything horribly wrong with this code? I only understand pinning at the most basic level, so other than just trying things until it let me compile, I have no clue if this is valid. As a bonus, any other general guidance on how to implement this is appreciated.

use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;
use tokio::process::ChildStderr;

struct ErrFuture<'a> {
    stderr: &'a mut ChildStderr,
    buffer: Vec<u8>,
}

impl<'a> ErrFuture<'a> {
    fn new(stderr: &'a mut ChildStderr) -> Self {
        Self {
            stderr,
            buffer: vec![],
        }
    }
}

impl<'a> futures::future::Future for ErrFuture<'a> {
    type Output = io::Result<Vec<u8>>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut buffer = [0; 1024];
        match Pin::new(&mut self.stderr).poll_read(cx, &mut buffer) {
            Poll::Ready(Ok(n)) => {
                log::trace!("Read {} bytes from stderr", n);
                self.buffer.extend_from_slice(&buffer[..n]);
                cx.waker().wake_by_ref();
                return Poll::Pending;
            }
            Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
            Poll::Pending => {}
        }
        Poll::Ready(Ok(self.buffer.clone()))
    }
}

For anyone that comes across this, as far as my second question is concerned, the usage of Pin seems to be the same way that it is done within the tokio code base, so it seems like its the correct way to do it

Generally it sounds like you should use some combination of join!, try_join! and select! to read/write simultaneously, instead of trying to detect if stderr is ready.

Yes, you can use poll_fn. It would be similar to your custom future.

It looks ok. Usually futures are implemented by calling poll_read in a loop until it returns Poll::Pending instead of requesting to be woken again immediately. You can avoid the clone with mem::take.

Poll::Ready(Ok(mem::take(self.buffer.clone)))

20ms? That's really long. It sounds like your timeout was starved by some other task executing a blocking operation. The doc has a section on CPU-bound tasks and blocking code that I think you should read.

Thanks for the response @Alice. Definitely appreciated

I hadn't seen poll_fn before and it looks helpful, though I'm not clear how that would be used with something that implements AsyncRead.

I thought about calling poll_read in a loop, but I wasn't sure if I did that it was actually yielding control from the current task. Clearly I don't quite have the right mental model for async yet.

Thanks for the pointer to mem::take.

It felt long. There shouldn't have been anything else going on in my program at the time, but it is part of a larger program, so maybe there were other tasks running that I'm not thinking of. I'll see if I can replicate it with a simpler case though that may be difficult without the specific subprocess I was calling.

You could implement it with poll_fn like this:

async fn consume_ready(stderr: &mut ChildStderr) -> Result<Vec<u8>> {
    let mut vec = Vec::new();
    poll_fn(|cx| loop {
        let mut buf = [0; 1024];
        match Pin::new(&mut *stderr).poll_read(cx, &mut buf) {
            Poll::Ready(Ok(n)) => {
                vec.extend_from_slice(&buf[..n]);
                log::trace!("Read {} bytes from stderr", n);
            }
            Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
            Poll::Pending => return Poll::Ready(Ok(())),
        }
    }).await?;
    Ok(vec)
}

playground

The loop is how it is usually done. Tokio uses a special budget, which ensures the always-ready leaf future will return Poll::Pending at some point. Unfortunately IO resources outside of Tokio might decide to be always-ready, which could cause an infinite loop.

Thanks alice. Much appreciated

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.