Grab stdout/stderr as available

Hello Crustaceans!

I'm using Tokio's process::Command to run a command locally. When I use spawn, I know I get back a Child and from that Child, I can get ChildStdout and ChildStderr. How can I continuously poll to grab the latest output while waiting for the child to finish?

You can keep reading from the ChildStdout until you get a read of size 0, then call wait on the child. The child is guaranteed to close its stdout and stderr when it exits (but may also close them before exiting).

Note also that the stdout/stderr objects can be taken out of the Child object, allowing you to access both the pipes and the Child object simultaneously.

4 Likes

I'm really new to async in Rust and trying to get my head around it (mostly by trying to forget my knowledge of async Python which does things a bit differently). Am I on the right path with something like this:

let child = match command.spawn() {
        Ok(child) => child,
        Err(err) => {
            slog::error!(logging::get(), "Failed to spawn child process: {:?}", err);
            return Err(anyhow!(err));
        }
    };

    let stdout = match child.stdout.take() {
        Some(stdout) => stdout,
        None => {
            slog::error!(logging::get(), "Failed to get child stdout stream",);
            return Err(anyhow!("Failed to get child stdout stream"));
        }
    };

    let stderr = match child.stderr.take() {
        Some(stderr) => stderr,
        None => {
            slog::error!(logging::get(), "Failed to get child stderr stream",);
            return Err(anyhow!("Failed to get child stderr stream"));
        }
    };

    loop {
        select! {
            stdo = stdout => println!(stdo),
            stde = stderr => eprintln!(stde),
            output = child => println!("Done!"),
            complete => break,
        };
    }

You can't select directly on stdout and stderr because they implement AsyncRead, not Future.

I would probably spawn a task for each output:

let stdout_task = tokio::spawn(async move {
    let stdout = BufReader::new(stdout);
    let mut line = String::new();
    loop {
        line.clear();
        match stdout.read_line(&mut line) {
            Err(err) => return Err(err),
            Ok(0) => return Ok(()),
            Ok(_) => {
                println("{}", line);
            },
        }
    }
});

Then after spawning, I would wait in the following manner:

child.wait().await;
stdout_task.await;
stderr_task.await;
1 Like

Hmm, but if we wait in this manner, aren't we awaiting the child first and then awaiting the output? Maybe I need a join! here?

When you use tokio::spawn, the spawned task will start running immediately. Unlike raw futures, a spawned task is not lazy.

2 Likes

Oh, interesting. Okay, cool!

One more question: do you know if reading out of the buffer will remove the things I read from the buffer?

Some context on what I'm doing: sometimes the command can have tons and tons of output, and to keep the memory footprint small, i want to read this as the process runs so I can keep it trimmed down in a Vec:

        let mut line = String::new();
        let mut lines: Vec<u8> = Vec::new();
        loop {
            line.clear();
            match stdout_reader.read_line(&mut line).await {
                [...]
                Ok(_) => {
                    lines.extend_from_slice(line.as_bytes());

                    // keep only the last 512K bytes
                    let length = lines.len();
                    if length > 51200 {
                        lines.drain(..length - 51200).for_each(drop);
                    }
                }
            }
        }

What do you mean?

What I meant was will I be freeing system resources (memory in this case) by reading out of the buffer and draining it and trimming to the size I need. The only reason I'm doing this live reading is because sometimes a command can output gigabytes of data and I don't want to OOM the machine, so I want to read and throw away everything but hold on to the last 512K of the output. In other languages, I noticed that while I could certainly stream the output, the stdio/stderr output was still held on to by the internals of the implementation so it didn't really help with resource management.

The stdout implementation if buffered to reduce the cost of many small writes, but you can always call flush to empty the buffer.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.