How to maintain the order of a terminal command program output(stdout/stderr) with std::process::Command?

I wrote a Rust crate for "Command Process Output Capturer" usage like this:

mod output;
mod status;
mod test;
use crate::output::{Output, OutputType};
use crate::status::CommandStatus;

use anyhow::Result;
use crossbeam::channel::{unbounded, Receiver, Sender};
use encoding_rs::GB18030;
use std::io::{Read, Write};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub struct CommandRunner {
    command: Command,
    child: Arc<Mutex<Option<Child>>>,
    output_sender: Sender<Output>,
    output_receiver: Receiver<Output>,
    threads: Vec<JoinHandle<()>>,
    force_stop: Arc<AtomicBool>,
}

impl CommandRunner {
    pub fn new(command: &str) -> Result<Self> {
        // init command
        let parts: Vec<&str> = command.split_whitespace().collect();
        let (cmd_root, cmd_args) = if parts.len() > 1 {
            (parts[0], &parts[1..])
        } else {
            (parts[0], &[][..])
        };
        let mut command = Command::new(cmd_root);
        command
            .args(cmd_args)
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped());

        // init output channel
        let (output_sender, output_receiver) = unbounded();

        // use `.spawn()` to check it the command is valid, if valid, termiate it immediately.
        let mut child = command.spawn()?;
        child.kill().unwrap();
        child.wait().unwrap();

        // return new instance
        Ok(Self {
            command,
            child: Arc::new(Mutex::new(None)),
            output_sender,
            output_receiver,
            threads: Vec::new(),
            force_stop: Arc::new(AtomicBool::new(false)),
        })
    }

    pub fn run(&mut self) {
        // 停止之前的进程(如果存在)
        self.stop();

        // 初始化子进程和相关字段
        let mut child = self.command.spawn().unwrap();
        self.force_stop
            .store(false, std::sync::atomic::Ordering::SeqCst);

        // 对于 stdout 流
        let force_stop_for_stdout = Arc::clone(&self.force_stop);
        let stdout = child.stdout.take().unwrap();
        let stdout_child = Arc::clone(&self.child);
        let stdout_sender = self.output_sender.clone();
        let stdout_thread = thread::spawn(move || {
            process_stream(
                stdout,
                &stdout_sender,
                false,
                stdout_child,
                force_stop_for_stdout,
            );
        });

        // 对于 stderr 流
        let force_stop_for_stderr = Arc::clone(&self.force_stop);
        let stderr = child.stderr.take().unwrap();
        let stderr_child = Arc::clone(&self.child);
        let stderr_sender = self.output_sender.clone();
        let stderr_thread = thread::spawn(move || {
            process_stream(
                stderr,
                &stderr_sender,
                true,
                stderr_child,
                force_stop_for_stderr,
            );
        });

        // 收集线程
        self.threads.push(stdout_thread);
        self.threads.push(stderr_thread);

        // update child process
        *self.child.lock().unwrap() = Some(child);
    }

    pub fn stop(&mut self) {
        // set force stop flag
        self.force_stop
            .store(true, std::sync::atomic::Ordering::SeqCst);

        // wait for threads to finish
        for thread in self.threads.drain(..) {
            thread.join().unwrap();
        }

        // kill child process first
        if let Some(child) = self.child.lock().unwrap().as_mut() {
            child.kill().unwrap();
            child.wait().unwrap();
        }

        // clear threads vec
        self.threads.clear();
    }

    pub fn is_stopped(&self) -> bool {
        matches!(self.check_status().unwrap(), CommandStatus::Stopped)
    }

    pub fn is_running(&self) -> bool {
        matches!(self.check_status().unwrap(), CommandStatus::Running)
    }

    fn check_status(&self) -> Result<CommandStatus, String> {
        if self.child.lock().unwrap().is_none() {
            return Err("Child process is not initialized yet.".to_string());
        }
        Ok(check_child_process_status(&self.child))
    }

    pub fn get_one_line_output(&self) -> Option<Output> {
        self.output_receiver.try_recv().ok()
    }
}

impl Drop for CommandRunner {
    fn drop(&mut self) {
        self.stop();
    }
}

fn process_stream<R: Read>(
    mut stream: R,
    sender: &Sender<Output>,
    is_stderr: bool,
    child: Arc<Mutex<Option<Child>>>,
    force_stop: Arc<AtomicBool>,
) {
    let mut buffer = [0; 1024];
    let mut leftover = Vec::new();

    while !force_stop.load(std::sync::atomic::Ordering::SeqCst)
        && check_child_process_status(&child) != CommandStatus::Stopped
    {
        match stream.read(&mut buffer) {
            Ok(0) => break, // 流结束
            Ok(n) => {
                leftover.extend_from_slice(&buffer[..n]);
                process_buffer(sender, &mut leftover, is_stderr);
            }
            Err(_) => break, // 读取错误
        }
    }
}

fn process_buffer(sender: &Sender<Output>, buffer: &mut Vec<u8>, is_stderr: bool) {
    while let Some(newline_pos) = buffer.iter().position(|&b| b == b'\n') {
        let line = buffer.drain(..=newline_pos).collect::<Vec<_>>();
        let (decoded, _, _) = GB18030.decode(&line);
        let output = if is_stderr {
            Output::new(OutputType::StdErr, decoded.trim().to_owned())
        } else {
            Output::new(OutputType::StdOut, decoded.trim().to_owned())
        };
        sender.send(output).unwrap();
    }
}

fn check_child_process_status(child: &Arc<Mutex<Option<Child>>>) -> CommandStatus {
    let mut status = CommandStatus::Stopped;
    if let Ok(mut child_guard) = child.lock() {
        if let Some(child) = child_guard.as_mut() {
            if let Ok(result) = child.try_wait() {
                match result {
                    Some(_) => status = CommandStatus::Stopped,
                    None => status = CommandStatus::Running,
                }
            }
        }
    }
    status
}

, but the unit test

#[test]
fn test_std_output_and_error_from_python_script() {
    let mut executor = CommandRunner::new("python ./tests/test_error.py").unwrap();
    executor.run();

    let mut outputs = Vec::new();
    while executor.is_running() {
        if let Some(output) = executor.get_one_line_output() {
            outputs.push(output);
        }
    }

    // check outputs
    println!("the outputs are:{:?}", outputs);

    assert_eq!(outputs.len(), 4);
    assert_eq!(outputs[0].as_str(), "[1]:normal print.");
    assert_eq!(outputs[0].get_type(), OutputType::StdOut);

    assert_eq!(outputs[1].as_str(), "[2]:error print.");
    assert_eq!(outputs[1].get_type(), OutputType::StdErr);

    assert_eq!(outputs[2].as_str(), "[3]:normal print.");
    assert_eq!(outputs[2].get_type(), OutputType::StdOut);

    assert_eq!(outputs[3].as_str(), "[4]:error print.");
    assert_eq!(outputs[3].get_type(), OutputType::StdErr);
}

failed with error msg like assertion left == right failed left: "[3]:normal print." right: "[2]:error print." with the corresponding python script:

import sys

if __name__ == "__main__":
    for i in range(1,5):
        if i%2 == 0:
            print(f"[{i}]:error print.", file=sys.stderr)
        else:
            print(f"[{i}]:normal print.")

The reason for failure is due the caught order is random between channel stdout and stderr.

Before, I first met this issue when writing code using concurrency approach with tokio or even low level crate mio and crossbeam.
I thought may be the concurrency may cause it since both stdout and stderr are processed in the same thread. So then I tried this multi-thread version, but it still failed.

I wonder if this issue is inevitable. I know this issue would somehow be minored by adding flush in the Python script.
But this is definitely not the solution. After all, I can't force users to do so, while generally when running such python script manually in the terminal shell would not have such an output order issue.

The full code is here.

Check out highlight-stderr and io-mux for a (non-portable) way to do this. It’s quite clever!

3 Likes

And the portable order-preserving approach is to use the same pipe for both stdout and stderr, giving up on distinguishing them.

4 Likes

Thanks your answers. @uberjay, for io-mux, since I am working on windows, it is not a good choice, because even the feature experimental-unix-support is triggered on, it can't be compiled on windows.
@kpreid, for the suggestion "use the same pipe for both stdout and stderr", I did try so with crate os_pipe, but the output order is still mixed. I guess maybe I need to manually integrate the error pipe into the stdout pipe.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.