Stream data in, compress, and stream out

Hi guys, I want to take a stream, turn it into a zip file and stream that out but I'm having problems, specifically, I don't understand why I get this panic, in this minimal example: (see the comment)

use async_zip::{
    write::{EntryOptions, ZipFileWriter},
    Compression,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() {
    let dummy_file: &[u8] = b"hello world";

    let (mut compressed_rx, mut compressed_tx) = tokio::io::duplex(1024);
    tokio::spawn(async move {
        let mut writer = ZipFileWriter::new(&mut compressed_tx);
        let mut entry_writer = writer
            .write_entry_stream(EntryOptions::new(
                "simple.txt".to_string(),
                Compression::Stored,
            ))
            .await
            .unwrap();

        entry_writer.write_all(dummy_file).await.unwrap();

        entry_writer.close().await.unwrap(); // panicked at 'called `Result::unwrap()` on an `Err` value: UpstreamReadError(Kind(BrokenPipe))
        writer.close().await.unwrap();
    });

    // "compressed_rx" should have our stream, if successful...
    // but for testing here, lets collect it all into memory and write to file
    let mut output = Vec::new();
    compressed_rx.read_to_end(&mut output).await.unwrap();
    tokio::fs::write("tmp/arif.zip", &output).await.unwrap();
}

Any help is greatly appreciated!!

Note, that creating:

let mut file = tokio::fs::File::create("/tmp/foo.zip").await.unwrap();

and passing &mut file to ZipFileWriter::new() with an .await on the end of the tokio::spawn() does produce expected results

I guess I could probably solve this by streamingly write to a File, and at the same time streamingly read from that File (I hope/wonder if it's safe if the client reads from the file faster than the ZipFileWriter writes to it?)

This seems like a very strange solution. For more strangeness, and less file access, maybe send it through a fifo, or a channel or named pipe.

Yes! Absolutely, sorry, when I said "solve" i meant, just progress for now.

This is not optimal, because now the user has to wait for the zip file to be entirely created before he can begin to start downloading it.

You can see from my example, that I do very much want to use a channel (compressed_tx) but using it causes a panic, which I can't understand right now. I think one of the two ends rx/tx is dropping early?

tokio::spawn create a new green thread. But it does not enforce an order of statements.

Looking at your code, it is not guaranteed the task spawn is done before the call of compress_rx.read_to_end.

UpstreadReadError means a client-side—in this case, compressed_rx—raised an error in the middle of reading a buffer.

'AsyncReadExt::read_to_end' calls 'poll_read' in loop. Let's say that 'entry_writer' is trying to close while 'compressed_rx' keeps reading a buffer. 'entry_writer.close' calls 'AsyncWrietExt::shutdown'(='writer') internally. in the shutdown, the buffer gets flushed and dropped.

It's the point where the problem comes in. Note that 'copressed_rx' is still reading the buffer. But the buffer was already dropped by '.close()'. It is undefined behavior, and 'entry_writer' panics and returns an error.

This error propagates and reach to '.close()' since the internal writer(=compress_tx) waits for Result from its counterpart(=compress_rx)

One solution would be an explicit manipulation of JoinHandle that is constructed as a return value of tokio::spawn.

let join_handle = tokio::spanwn(async move { 
                                  /... 
                                 return entry_write
                                   })

/...

let mut output = Vec::new();
compressed_rx.read_to_end(&mut output).await.unwrap();
let entry_writer = join_handle.await.unwrap();
entry_writer.close().await.unwrap();
writer.close().await.unwrap();
tokio::fs::write("tmp/arif.zip", &output).await.unwrap();

lifetime bothers me, but since all variable is inferred as 'static due to a condition of tokio::spawn, I believe there would be no issue from lifetime

Thank you very much for your effort. The problem is that I can't move entry_writer into a new task because it mut borrows writer, and if I want to compress more than one file, I'm fooked.

I have however solved it! turned out there was a bug in the library. The solution is the next post.

The wizards in the Discord (namely @jakubdabek and @Nemo157) figured out that there is a bug in the async_zip library that is causing my duplex stream to be shutdown prematurely. Wrapping it in a newtype that prevents it from getting shutdown prematurely fixes it!) Wonderful

Here is the solved solution for anyone looking to do something similar in the future:

// Cargo.toml:
// [dependencies]
// async_zip = "0.0.6"
// tokio = { version = "1", features = ["full"] }

use std::{pin::Pin, task::Poll};

use async_zip::{
    write::{EntryOptions, ZipFileWriter},
    Compression,
};
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream};

struct DuplexWrapper(DuplexStream);

impl DuplexWrapper {
    fn into_inner(self) -> DuplexStream {
        self.0
    }
}

impl AsyncWrite for DuplexWrapper {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, std::io::Error>> {
        Pin::new(&mut self.0).poll_write(cx, buf)
    }

    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), std::io::Error>> {
        Pin::new(&mut self.0).poll_flush(cx)
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), std::io::Error>> {
        // Ignore the shutdown request. This will fix the bug with async_zip/async_compression that accidentally
        // shuts down our duplexes!
        Poll::Ready(Ok(()))
    }
}

#[tokio::main]
async fn main() {
    // Prepare a stream that will receive the compressed bytes
    let (compressed_rx, compressed_tx) = tokio::io::duplex(1024);
    // Wrap me in a newtype to protect it from getting shutdown prematurely
    // by the the async_zip library. This is a bug, and will be fixed soon
    // FIXME: when bug is fixed, this won't be necessary
    let compressed_rx = DuplexWrapper(compressed_rx);
    let mut compressed_tx = DuplexWrapper(compressed_tx);

    tokio::spawn(async move {
        // Prepare our ZipFileWriter
        let mut zip_archive = ZipFileWriter::new(&mut compressed_tx);

        // Get a list of hashes and paths that we need to compress
        let files_to_zip = get_files_to_zip();

        for (hash, path) in files_to_zip {
            let (mut uncompressed_rx, mut uncompressed_tx) = tokio::io::duplex(1024);

            let mut entry_writer = zip_archive
                .write_entry_stream(EntryOptions::new(path, Compression::Deflate))
                .await // Does this await, await for the stream to close, or just the construction of the EntryStreamWriter?
                .unwrap();

            // Begin streaming into the channel
            tokio::spawn(async move {
                stream_file(&hash, &mut uncompressed_tx).await;
            });

            // Copy from channel into the entry_writer
            tokio::io::copy(&mut uncompressed_rx, &mut entry_writer)
                .await
                .unwrap();

            // finalize this file's compression
            entry_writer.close().await.unwrap();
        }

        // When all uncompressed_streams have completed we can close off
        // the ZipFileWriter
        zip_archive.close().await.unwrap();
    });

    // unwrap from the early-shutdown bug commented earlier
    // FIXME: when bug is fixed, this won't be necessary
    let mut compressed_rx = compressed_rx.into_inner();

    // "compressed_rx" should have our stream, if successful...
    // but for testing here, lets collect it all into memory and write to file
    let mut output = Vec::new();
    compressed_rx.read_to_end(&mut output).await.unwrap();
    tokio::fs::write("output.zip", &output).await.unwrap();
}

fn get_files_to_zip() -> Vec<(String, String)> {
    vec![
        ("1".to_string(), "/first.txt".to_string()),
        ("2".to_string(), "/folder/second.txt".to_string()),
    ]
}

async fn stream_file<W>(hash: &str, writer: &mut W)
where
    W: AsyncWrite + Send + Unpin,
{
    match hash {
        "1" => writer.write_all(b"the content of the first file"),
        "2" => writer.write_all(b"the content of the second file"),
        _ => panic!(),
    }
    .await
    .unwrap();
}
1 Like