Tokio: task hangs when cancelling copy from a fifo(7)

Hello,

I'm working on an RPC interface that executes a program in a different process and forwards stdin and stdout to the process making the RPC request. The request itself is sent over HTTP; streams are passed through FIFOs (created with mkfifo). I'm running tokio::try_join on 3 futures:

  1. copying own stdin into a fifo
  2. copying from a fifo into own stdout
  3. making the HTTP request

Overall it works, but I'm trying to improve error handling. For example, sometimes the HTTP request may fail. In that case, the futures responsible for copying data in streams should be cancelled. Unfortunately, the program hangs when it's supposed to exit.

This is the code triggering the hang:

let mut stdout = tokio::io::stdout();

let mut stdout_pipe_handle = tokio::fs::OpenOptions::new()
    .read(true)
    .write(true)
    .create(false)
    .open(stdout_pipe.path.as_path()).await?;

tokio::io::copy(&mut stdout_pipe_handle, &mut stdout).await?;

Using tokio-console, I was able to see that after the HTTP request fails, there's one task left that's busy:

This is the spawn_blocking call that hangs inside tokio:

This makes the program hang after main() returns. This is running on MacOS.

How can I avoid hanging here? I'm only interested in this working on MacOS and Linux, maybe there's a crate that supports asynchronous I/O on those platforms natively? The only ones I found were more or less abandoned.

Thank you!

Is the RPC process exiting when an HTTP request fails? I'm not very familiar with using named pipes, but it sounds like IO on them requires the pipe to be open on both ends

In the particular case I'm testing, the remote backend isn't even running, so nobody opens the pipe from the other end. Therefore, the HTTP request fails with "Connection refused". Therefore, no I/O occurs, but I still need to cancel the copy. This is just one possible error scenario where copy needs to be cancelled, but there are a few others too.

Operations on a tokio::fs::File are uncancelable because they are designed to work with real files. It is not designed for use with fifo pipes. Use an UnixStream for that.

Edit: Please see tokio::net::unix::pipe instead.

2 Likes

Thank you! Is it just a drop-in replacement that I can use with copy too? Because I'm getting an error:

error: Os { code: 38, kind: Uncategorized, message: "Socket operation on non-socket" }

This implies that I should switch to domain sockets instead of FIFOs? I don't really have a requirement that it should work with FIFOs, I just need to connect data streams somehow

Are you using HTTP for metadata and trying to pass large data over the FIFO to avoid head-of-line blocking issues?

You might try out e.g. https://crates.io/crates/tokio-unix-ipc which allows this pattern by using file descriptor passing over the unix domain socket - for example, by passing a pipe() file descriptor or a memfd.

Let me maybe describe the full sequence of calls and processes when it works.

Process A:

  1. Creates 2 temporary FIFOs
  2. Makes HTTP request to process B where FIFO paths are included
  3. Copies own stdin to the FIFO 1
  4. Copies from FIFO 2 to own stdout

Process B:

  1. Accepts HTTP requests
  2. Runs a subprocess C
  3. Copies from FIFO 1 to subprocess C stdin
  4. Copies from subprocess C stdout to FIFO 2

Process C:

  1. Communicates over stdin and stdout (it's actually git-receive-pack or git-upload-pack)

The desired behaviour is that when someone runs program A, it would behave just like program C in terms of stdin and stdout. So running this command:

echo test | program-a > file.txt

would be the same as:

echo test | program-c > file.txt

The error case I'm testing is when process B is not available or fails in some other way, hence the cancellation.

Okay, so turns out I was missing something in plain sight this whole time. I was already using this crate for reading from stdin:

it's a wrapper around tokio::io::unix::AsyncFd that implements AsyncWrite and AsyncRead. So all I had to do here is just ditch tokio::fs::File, and use this wrapper. So far all scenarios, both with errors and without, seem to work as expected. Thank you everyone for answers and help :raised_hands:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.