Copying a non-Send Read to a Write without blocking the thread

First some context to prevent an XY-problem:

I wrote a tool a while back that basically allows grep to work on many different file types: GitHub - phiresky/ripgrep-all: rga: ripgrep, but also search in PDFs, E-Books, Office documents, zip, tar.gz, etc.

The most complicated functionality is being able to recurse down into archives, e.g. extracting the content of a pdf that's inside a zip that's inside a tar archive.

The main architecture was a set of "adapters" that have an interface of adapt(metadata, Box<Read>, Box<Write>) -> () where they just read some binary file format and write out the contents to a write stream.

In an effort to make it more expandable I wanted to allow more flexible user-configurable chaining of adapters that work by spawning external programs. For example, you could have an adapter that converts other character encodings to UTF8.

To do this, I planned to change the interface to bomething like adapt(metadata, Box<Read>) -> Iter<Box<Read>>, where an adapter wraps the input and returns a set of inner "files" that are Reads and can then be passed to more adapters. The adapters include internal adapters written in Rust that e.g. query SQLite databases or decompress archives, plus custom configurable ones that spawn subprocesses.


Now the question:

How can I convert a Read to a Write without blocking the whole thread? For example, the zip crate gives me a Read for each file in the zip. I want to pass this Read to a subprocess calling the pdftotext binary, returning the output stream of pdftotext as another Read.

Here's some ideas I could think of.

  1. Create a new thread that runs std::io::copy(readFromZip, subprocess.stdin). But the Read I get from the zip crate (and other sources) is not Send, so I can't move it to a new thread. If I run std::io::copy on the main thread, then it deadlocks since the subprocess is waiting for me to read from stdout before accepting more data on stdin. See this code, the streaming test hangs
  2. Make the adapter interface async (async adapt(metadata, input: AsyncBufRead) -> Iter<AsyncBufRead>). Then everything could stay on the same thread. But it seems like this would complicate my architecture a lot, and many things I use don't support async outputs (e.g. the tar and zip crates).
  3. Run the whole adapter in a new thread. Then the Iter returned by the zip adapter is already in a separate thread from my main logic, and the subsequent adapter can run on the main thread. This doesn't work when chaining two zip adapters though, since the second instance of ZipArchive has to be on the same thread as the first.

Sorry that this question is somewhat ambiguous. I've basically been stuck on this for a long time need to figure it out to finally be able to publish a new version of my tool :confused:

Is the Read you get from zip really not Send, or is it just because you put it in a Box without specifying Send on it? You probably want Box<dyn Read + Send + Sync> and similarly for Write.

Also, try to remember including dyn in front of the type when using a trait as a type. You should be getting warnings about this, and it makes it easier for me to read as I know whether something is a type or trait.

The Read from zip is not Send. And it's not just the zip crate, but tar and probably others as well. My issue is more with the logic of my code in general than with any specific Rust safety thing I'd guess.

The Box I put in my post are just examples, the actual code looks somewhat different, I have a mix of impl _ , &dyn _ and Box. I actually had impl Read in the post before since it doesn't really matter (pretty sure) - I intentionally wasn't too specific with the types since I don't really care how the specifics look since I can change that easily if there's some issue.

Why does "the second instance of ZipArchive has to be on the same thread as the first"? They're two separate decoders, right? Maybe you can sort this out by creating it from a new thread. (One way to think of this approach is using the zip decoder as if it was a subprocess.)

Yeah, I do see the issue. You'll need to create the zip object on the thread you want to use it on.

Why does "the second instance of ZipArchive has to be on the same thread as the first"? They're two separate decoders, right?

For the use case of a zip file with in a zip archive: I start with a Read + Send (from e.g. the file system), which is passed into read_zipfile_from_stream, which gives me a Read + !Send. Now that Read needs to go into read_zipfile_from_stream again, but since this new Read does not support Send, both instances of zip will now be running on the same thread without being able to change that, since i can't move the inner Read to a different thread. At least I think?

Though I guess those two zip decoders being on the same thread isn't really an issue, since they wouldn't deadlock each other, which is the problem with the adapter that spawns a subprocess...

I'll have to think if creating a zip object on a different thread from the start is a solution to my problem

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.