First some context to prevent an XY-problem:
I wrote a tool a while back that basically allows grep to work on many different file types: GitHub - phiresky/ripgrep-all: rga: ripgrep, but also search in PDFs, E-Books, Office documents, zip, tar.gz, etc.
The most complicated functionality is being able to recurse down into archives, e.g. extracting the content of a pdf that's inside a zip that's inside a tar archive.
The main architecture was a set of "adapters" that have an interface of adapt(metadata, Box<Read>, Box<Write>) -> ()
where they just read some binary file format and write out the contents to a write stream.
In an effort to make it more expandable I wanted to allow more flexible user-configurable chaining of adapters that work by spawning external programs. For example, you could have an adapter that converts other character encodings to UTF8.
To do this, I planned to change the interface to bomething like adapt(metadata, Box<Read>) -> Iter<Box<Read>>
, where an adapter wraps the input and returns a set of inner "files" that are Reads and can then be passed to more adapters. The adapters include internal adapters written in Rust that e.g. query SQLite databases or decompress archives, plus custom configurable ones that spawn subprocesses.
Now the question:
How can I convert a Read
to a Write
without blocking the whole thread? For example, the zip
crate gives me a Read
for each file in the zip. I want to pass this Read
to a subprocess calling the pdftotext
binary, returning the output stream of pdftotext
as another Read.
Here's some ideas I could think of.
- Create a new thread that runs
std::io::copy(readFromZip, subprocess.stdin)
. But theRead
I get from the zip crate (and other sources) is not Send, so I can't move it to a new thread. If I runstd::io::copy
on the main thread, then it deadlocks since the subprocess is waiting for me to read from stdout before accepting more data on stdin. See this code, thestreaming
test hangs - Make the adapter interface async (
async adapt(metadata, input: AsyncBufRead) -> Iter<AsyncBufRead>
). Then everything could stay on the same thread. But it seems like this would complicate my architecture a lot, and many things I use don't support async outputs (e.g. the tar and zip crates). - Run the whole adapter in a new thread. Then the Iter returned by the zip adapter is already in a separate thread from my main logic, and the subsequent adapter can run on the main thread. This doesn't work when chaining two zip adapters though, since the second instance of ZipArchive has to be on the same thread as the first.
Sorry that this question is somewhat ambiguous. I've basically been stuck on this for a long time need to figure it out to finally be able to publish a new version of my tool