Hello, I want to start a Command process, asynchronously feed it bytes to its stdin then read the output when it becomes available.
I think i am able to pass the bytes in, but the process is not exiting on it's own somehow.
I may eventually use tokio, but this is not a question about async API, but rather more fundamental about communicating with a Command
, i think.
Here's is what I have so far, can anyone see what is wrong?
use std::{
error::Error,
io::{Read, Write},
process::{Command, Stdio},
sync::{mpsc::Sender, Arc, RwLock},
thread::JoinHandle,
};
use tracing::trace;
type DynError = Box<dyn Error + 'static>;
fn main() -> Result<(), DynError> {
tracing_subscriber::fmt()
.pretty()
.with_thread_names(true)
// enable everything
.with_max_level(tracing::Level::TRACE)
// sets this to be the default, global collector for this application.
.init();
trace!("Logging initialized");
let txt = b"I am some text";
let mimer = Mimer::new()?;
for t in txt {
if let Some(mime) = mimer.feed(vec![*t]) {
println!("{:?}", mime);
return Ok(());
}
}
mimer.wait();
Err("Mime not found".into())
}
struct Mimer {
tx_stdin: Sender<Vec<u8>>,
output: Arc<RwLock<Option<String>>>,
feeder: JoinHandle<()>,
}
impl Mimer {
fn new() -> Result<Self, DynError> {
let output = Arc::new(RwLock::new(None));
let _output = output.clone();
// setup process
let mut process = Command::new("file")
.arg("--brief")
.arg("--mime-type")
.arg("-")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let mut stdin = process
.stdin
.take()
.ok_or::<DynError>("couldn't get stdin".into())?;
let mut stdout = process
.stdout
.take()
.ok_or::<DynError>("couldn't get stdout".into())?;
// start feeder
let (tx_stdin, rx_stdin) = std::sync::mpsc::channel::<Vec<u8>>();
let feeder = std::thread::spawn(move || {
let mut stdout_buf = Vec::new();
let mut bytes_fed = 0;
trace!("starting feeder");
for msg in rx_stdin {
stdin.write(&msg).map(|len| bytes_fed += len).ok();
trace!("bytes_fed: {}, msg_len: {}", bytes_fed, msg.len());
if let Ok(Some(status)) = process.try_wait() {
trace!("process exited with status {:?}", status);
stdout.read(&mut stdout_buf).expect(
"after the command returns our output buffer must contain something",
);
let mime = String::from_utf8_lossy(&stdout_buf).trim().to_string();
*_output.write().unwrap() = Some(mime);
break;
}
}
let mime = String::from_utf8_lossy(&process.wait_with_output().unwrap().stdout)
.trim()
.to_string();
*_output.write().unwrap() = Some(mime);
});
Ok(Self {
tx_stdin,
output,
feeder,
})
}
fn feed(&self, buf: Vec<u8>) -> Option<String> {
self.tx_stdin.send(buf).unwrap();
self.output
.read()
.unwrap()
.as_deref()
.map(|mime| mime.to_string())
}
fn wait(self) {
self.feeder
.join()
.expect("not sure how to handle this error");
}
}