I'm working on an RPC interface that executes a program in a different process and forwards stdin and stdout to the process making the RPC request. The request itself is sent over HTTP; streams are passed through FIFOs (created with mkfifo). I'm running tokio::try_join on 3 futures:
copying own stdin into a fifo
copying from a fifo into own stdout
making the HTTP request
Overall it works, but I'm trying to improve error handling. For example, sometimes the HTTP request may fail. In that case, the futures responsible for copying data in streams should be cancelled. Unfortunately, the program hangs when it's supposed to exit.
This is the code triggering the hang:
let mut stdout = tokio::io::stdout();
let mut stdout_pipe_handle = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.create(false)
.open(stdout_pipe.path.as_path()).await?;
tokio::io::copy(&mut stdout_pipe_handle, &mut stdout).await?;
Using tokio-console, I was able to see that after the HTTP request fails, there's one task left that's busy:
This is the spawn_blocking call that hangs inside tokio:
This makes the program hang after main() returns. This is running on MacOS.
How can I avoid hanging here? I'm only interested in this working on MacOS and Linux, maybe there's a crate that supports asynchronous I/O on those platforms natively? The only ones I found were more or less abandoned.
Is the RPC process exiting when an HTTP request fails? I'm not very familiar with using named pipes, but it sounds like IO on them requires the pipe to be open on both ends
In the particular case I'm testing, the remote backend isn't even running, so nobody opens the pipe from the other end. Therefore, the HTTP request fails with "Connection refused". Therefore, no I/O occurs, but I still need to cancel the copy. This is just one possible error scenario where copy needs to be cancelled, but there are a few others too.
Operations on a tokio::fs::File are uncancelable because they are designed to work with real files. It is not designed for use with fifo pipes. Use an UnixStream for that.
Thank you! Is it just a drop-in replacement that I can use with copy too? Because I'm getting an error:
error: Os { code: 38, kind: Uncategorized, message: "Socket operation on non-socket" }
This implies that I should switch to domain sockets instead of FIFOs? I don't really have a requirement that it should work with FIFOs, I just need to connect data streams somehow
Are you using HTTP for metadata and trying to pass large data over the FIFO to avoid head-of-line blocking issues?
You might try out e.g. https://crates.io/crates/tokio-unix-ipc which allows this pattern by using file descriptor passing over the unix domain socket - for example, by passing a pipe() file descriptor or a memfd.
Okay, so turns out I was missing something in plain sight this whole time. I was already using this crate for reading from stdin:
it's a wrapper around tokio::io::unix::AsyncFd that implements AsyncWrite and AsyncRead. So all I had to do here is just ditch tokio::fs::File, and use this wrapper. So far all scenarios, both with errors and without, seem to work as expected. Thank you everyone for answers and help