Asynchronously feed bytes to a Command

Hello, I want to start a Command process, asynchronously feed it bytes to its stdin then read the output when it becomes available.

I think i am able to pass the bytes in, but the process is not exiting on it's own somehow.

I may eventually use tokio, but this is not a question about async API, but rather more fundamental about communicating with a Command, i think.

Here's is what I have so far, can anyone see what is wrong?

use std::{
    error::Error,
    io::{Read, Write},
    process::{Command, Stdio},
    sync::{mpsc::Sender, Arc, RwLock},
    thread::JoinHandle,
};
use tracing::trace;

type DynError = Box<dyn Error + 'static>;

fn main() -> Result<(), DynError> {
    tracing_subscriber::fmt()
        .pretty()
        .with_thread_names(true)
        // enable everything
        .with_max_level(tracing::Level::TRACE)
        // sets this to be the default, global collector for this application.
        .init();
    trace!("Logging initialized");

    let txt = b"I am some text";

    let mimer = Mimer::new()?;

    for t in txt {
        if let Some(mime) = mimer.feed(vec![*t]) {
            println!("{:?}", mime);
            return Ok(());
        }
    }

    mimer.wait();

    Err("Mime not found".into())
}

struct Mimer {
    tx_stdin: Sender<Vec<u8>>,
    output: Arc<RwLock<Option<String>>>,
    feeder: JoinHandle<()>,
}

impl Mimer {
    fn new() -> Result<Self, DynError> {
        let output = Arc::new(RwLock::new(None));
        let _output = output.clone();

        // setup process
        let mut process = Command::new("file")
            .arg("--brief")
            .arg("--mime-type")
            .arg("-")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .spawn()?;
        let mut stdin = process
            .stdin
            .take()
            .ok_or::<DynError>("couldn't get stdin".into())?;
        let mut stdout = process
            .stdout
            .take()
            .ok_or::<DynError>("couldn't get stdout".into())?;

        // start feeder
        let (tx_stdin, rx_stdin) = std::sync::mpsc::channel::<Vec<u8>>();
        let feeder = std::thread::spawn(move || {
            let mut stdout_buf = Vec::new();
            let mut bytes_fed = 0;
            trace!("starting feeder");
            for msg in rx_stdin {
                stdin.write(&msg).map(|len| bytes_fed += len).ok();
                trace!("bytes_fed: {}, msg_len: {}", bytes_fed, msg.len());
                if let Ok(Some(status)) = process.try_wait() {
                    trace!("process exited with status {:?}", status);
                    stdout.read(&mut stdout_buf).expect(
                        "after the command returns our output buffer must contain something",
                    );
                    let mime = String::from_utf8_lossy(&stdout_buf).trim().to_string();
                    *_output.write().unwrap() = Some(mime);
                    break;
                }
            }
            let mime = String::from_utf8_lossy(&process.wait_with_output().unwrap().stdout)
                .trim()
                .to_string();
            *_output.write().unwrap() = Some(mime);
        });

        Ok(Self {
            tx_stdin,
            output,
            feeder,
        })
    }

    fn feed(&self, buf: Vec<u8>) -> Option<String> {
        self.tx_stdin.send(buf).unwrap();
        self.output
            .read()
            .unwrap()
            .as_deref()
            .map(|mime| mime.to_string())
    }

    fn wait(self) {
        self.feeder
            .join()
            .expect("not sure how to handle this error");
    }
}

I see multiple problems in this code:

  • It only checks if the process has exited immediately after calling stdin.write(). So, assuming the child process (+ kernel IPC implementation etc.) takes some time to do its work, the feeder thread will be blocked waiting for another message in rx_stdin, and never check again if the process has exited. (This is probably the main reason your program isn't working.)
  • It waits for the process to exit before it tries to read the stdout pipe. This will deadlock if the process's response does not fit in the pipe buffer, because the child process will be forever waiting for space to write.
  • Similarly to the first problem but internal to the program, your main loop together with feed() don't check if the output arrived later than immediately after the tx_stdin.send() — it just assumes failure.

Generally, the way to approach interacting with a child process is that you have two or three independent activities:

  1. Take the data from your internal source (Mimer::feed()) and write it into the child process stdin pipe. This blocks if the child process reads slower than you write for any reason. You will receive an error if the child process exits.
  2. Read the data from the child process stdout pipe, and either store it in a buffer or process it as you receive it. This blocks while the child process is either waiting for input or busy processing the data.
  3. Wait for the process to exit and check the status — if it returned a non-successful exit code then the stdout might not contain what you were hoping (this depends on the particular program).

You can perform (3) after (2) finishes, but you must perform (1) and (2) simultaneously, which can be done either using blocking IO on threads, or async non-blocking IO.

2 Likes

Hi, sorry for the slow response.

So I tried to absorb your advice, and was able to simplify the code. I start 2 threads, one to feed, and one to wait. The wait uses wait_with_output()

But the same thing still appears to be happening, all the bytes get sent, but then the process is just not exiting. Any idea why?

fn main() {
    tracing_subscriber::fmt()
        .pretty()
        .with_thread_names(true)
        // enable everything
        .with_max_level(tracing::Level::TRACE)
        // sets this to be the default, global collector for this application.
        .init();
    trace!("Logging initialized");

    let txt = b"I am some text";

    let mimer = Mimer::new().unwrap();

    for t in txt {
        mimer.feed(vec![*t])
    }

    let mime = mimer.wait().unwrap();

    println!("{:?}", mime);
}

struct Mimer {
    tx_stdin: Sender<Vec<u8>>,
    // _feeder: JoinHandle<Result<(), MimerError>>,
    waiter: JoinHandle<Result<Mime, MimerError>>,
}

#[derive(Debug, Error)]
enum MimerError {
    #[error(transparent)]
    Io(#[from] std::io::Error),
    #[error(transparent)]
    Mime(#[from] mime::FromStrError),
}

impl Mimer {
    fn new() -> Result<Self, MimerError> {
        // setup process
        let mut process = Command::new("file")
            .arg("--brief")
            .arg("--mime-type")
            .arg("-")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .spawn()?;
        let mut stdin = process.stdin.take().expect("only taken by me");

        // start feeder
        let (tx_stdin, rx_stdin) = std::sync::mpsc::channel::<Vec<u8>>();
        std::thread::spawn(move || {
            let mut bytes_fed = 0;
            trace!("starting feeder");
            for msg in rx_stdin {
                stdin.write(&msg).map(|len| bytes_fed += len)?;
                trace!("bytes_fed: {}, msg_len: {}", bytes_fed, msg.len());
            }
            Result::<(), MimerError>::Ok(())
        });

        // start waiter
        let waiter = std::thread::spawn(move || {
            let output = process.wait_with_output()?;

            Ok(Mime::from_str(
                String::from_utf8_lossy(&output.stdout).trim(),
            )?)
        });

        Ok(Self { tx_stdin, waiter })
    }

    fn feed(&self, buf: Vec<u8>) {
        self.tx_stdin.send(buf).ok();
    }

    fn wait(self) -> Result<Mime, MimerError> {
        self.waiter.join().unwrap() //TODOL  unwrap the panic
    }
}

You don't have any arrangement to close stdin when all of the data has been sent. So, the child process is waiting to see if you send it more bytes.

The simplest way to do this is by closing the internal channel, i.e. dropping tx_stdin, after all the bytes have been provided. (You'll need to change the type of Mimer::tx_stdin to be Option<Sender<Vec<u8>>> so that it can be replaced with None to drop the sender.) Then, the for msg in rx_stdin loop will end and the feeder thread will drop stdin and exit.

1 Like

GREAT! and even better, since the wait method consumes self, I can even just drop(self.tx_stdin); perrfeck!

Thanks for your help!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.