How do I extract a .tar.xz from actix-multipart upload

I'm trying to extract a multipart file upload, unfortunately there seems to be a dearth of information on on actix-multipart. It seems like I get a series of bytes chunks, but every xz/lzma and tar library I can find requires a single input to a new method, so I'm completely stumped as to what I can actually do with the chunks actix gives back.

I got to this point with the one bit of example code on actix-multipart, but have no no idea how to proceed from here:

#[post("/upload")]
async fn upload(mut multipart: Multipart) -> Result<HttpResponse, Error> {
    while let Ok(Some(mut field)) = multipart.try_next().await {
        let content_disposition = field.content_disposition().unwrap();
        let name = content_disposition.get_name().unwrap();
        // We're only interested in the "payload" field. Ignore everything else.
        if name != "payload" {
            continue;
        }
        web::block(|| fs::create_dir_all("./out"))
            .await
            .unwrap();

        // Field in turn is stream of *Bytes* object
        while let Some(chunk) = field.next().await {
            let data = chunk.unwrap();
            // What do I do with this bytes object????
        }
    }
    Ok(HttpResponse::Accepted().json(json!({"status": "success"})))
}

Well for example to read the full field into a Vec<u8>, you can do this:

let mut full = Vec::new();
// Field in turn is stream of *Bytes* object
while let Some(chunk) = field.next().await {
    let data = chunk.unwrap();
    full.extend_from_slice(&data);
}
// use full

A Bytes object is basically an Arc<Vec<u8>> with some extra capabilities.

That will cache the entire file in memory though, right? The upload is potentially quite large so I'm trying to decompress and untar the file contents to the disk somewhere as the bytes come in. lzma and tar both operate sequentially so this seems like it should be possible.

You can decompress the xz part using the async-compression crate, but the tar libraries I know don't integrate well with async/await.

You are going to have to wrap the handling of the tar file in a web::block. Luckily in our case, you also need to interact with the filesystem, which requires use of web::block in any case.

To do this, first define the following utility:

use tokio::runtime::Handle;
use tokio::io::{AsyncRead, AsyncReadExt};
use std::io::{Read, Result};

pub struct BlockingRead<R> {
    inner: R,
    handle: Handle,
}

impl<R: AsyncRead + Unpin> BlockingRead<R> {
    pub fn new(r: R) -> Self {
        Self {
            inner: r,
            handle: Handle::current(),
        }
    }
}

impl<R: AsyncRead + Unpin> Read for BlockingRead<R> {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
        self.handle.block_on(self.inner.read(buf))
    }
}

You can now unpack the directory like this:

use futures::stream::TryStreamExt; // for .map_err
fn hyper_to_io(err: hyper::Error) -> std::io::Error {
    todo!()
}


let body = tokio::io::stream_reader(field.map_err(hyper_to_io));
let xz = async_compression::tokio_02::bufread::XzDecoder::new(body);
let blocking = BlockingRead::new(xz);

web::block(move || {
    let mut archive = tar::Archive::new(blocking);
    archive.unpack("./out")
});

This is using the tar crate.

Thanks, unfortunately I can't get your code to compile. It looks like the multipart field can't be sent across threads? so it gives this error on the web::block line:

error[E0277]: `std::rc::Rc<std::cell::RefCell<actix_multipart::server::InnerField>>` cannot be sent between threads safely
   --> src/main.rs:31:5
    |
31  |       web::block(move || {
    |  _____^^^^^^^^^^_-
    | |     |
    | |     `std::rc::Rc<std::cell::RefCell<actix_multipart::server::InnerField>>` cannot be sent between threads safely
32  | |         let mut archive = tar::Archive::new(blocking);
33  | |         archive.unpack("./out")
34  | |     });
    | |_____- within this `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`
    | 
   ::: /home/<user>/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-web-3.0.2/src/web.rs:279:35
    |
279 |       F: FnOnce() -> Result<I, E> + Send + 'static,
    |                                     ---- required by this bound in `actix_web::web::block`
    |
    = help: within `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<actix_multipart::server::InnerField>>`
    = note: required because it appears within the type `actix_multipart::Field`
    = note: required because it appears within the type `futures::stream::IntoStream<actix_multipart::Field>`
    = note: required because it appears within the type `futures::stream::Map<futures::stream::IntoStream<actix_multipart::Field>, futures_util::fns::MapErrFn<[closure@src/main.rs:27:23: 27:36]>>`
    = note: required because it appears within the type `futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>`
    = note: required because it appears within the type `tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>`
    = note: required because it appears within the type `async_compression::tokio_02::bufread::generic::decoder::Decoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>, async_compression::codec::xz::decoder::XzDecoder>`
    = note: required because it appears within the type `async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>`
    = note: required because it appears within the type `upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>`
    = note: required because it appears within the type `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`

error[E0277]: `std::rc::Rc<std::marker::PhantomData<bool>>` cannot be sent between threads safely
   --> src/main.rs:31:5
    |
31  |       web::block(move || {
    |  _____^^^^^^^^^^_-
    | |     |
    | |     `std::rc::Rc<std::marker::PhantomData<bool>>` cannot be sent between threads safely
32  | |         let mut archive = tar::Archive::new(blocking);
33  | |         archive.unpack("./out")
34  | |     });
    | |_____- within this `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`
    | 
   ::: /home/<user>/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-web-3.0.2/src/web.rs:279:35
    |
279 |       F: FnOnce() -> Result<I, E> + Send + 'static,
    |                                     ---- required by this bound in `actix_web::web::block`
    |
    = help: within `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::marker::PhantomData<bool>>`
    = note: required because it appears within the type `actix_multipart::server::Safety`
    = note: required because it appears within the type `actix_multipart::Field`
    = note: required because it appears within the type `futures::stream::IntoStream<actix_multipart::Field>`
    = note: required because it appears within the type `futures::stream::Map<futures::stream::IntoStream<actix_multipart::Field>, futures_util::fns::MapErrFn<[closure@src/main.rs:27:23: 27:36]>>`
    = note: required because it appears within the type `futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>`
    = note: required because it appears within the type `tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>`
    = note: required because it appears within the type `async_compression::tokio_02::bufread::generic::decoder::Decoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>, async_compression::codec::xz::decoder::XzDecoder>`
    = note: required because it appears within the type `async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>`
    = note: required because it appears within the type `upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>`
    = note: required because it appears within the type `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`

error[E0277]: `std::rc::Rc<std::cell::Cell<bool>>` cannot be sent between threads safely
   --> src/main.rs:31:5
    |
31  |       web::block(move || {
    |  _____^^^^^^^^^^_-
    | |     |
    | |     `std::rc::Rc<std::cell::Cell<bool>>` cannot be sent between threads safely
32  | |         let mut archive = tar::Archive::new(blocking);
33  | |         archive.unpack("./out")
34  | |     });
    | |_____- within this `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`
    | 
   ::: /home/<user>/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-web-3.0.2/src/web.rs:279:35
    |
279 |       F: FnOnce() -> Result<I, E> + Send + 'static,
    |                                     ---- required by this bound in `actix_web::web::block`
    |
    = help: within `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::Cell<bool>>`
    = note: required because it appears within the type `actix_multipart::server::Safety`
    = note: required because it appears within the type `actix_multipart::Field`
    = note: required because it appears within the type `futures::stream::IntoStream<actix_multipart::Field>`
    = note: required because it appears within the type `futures::stream::Map<futures::stream::IntoStream<actix_multipart::Field>, futures_util::fns::MapErrFn<[closure@src/main.rs:27:23: 27:36]>>`
    = note: required because it appears within the type `futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>`
    = note: required because it appears within the type `tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>`
    = note: required because it appears within the type `async_compression::tokio_02::bufread::generic::decoder::Decoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>, async_compression::codec::xz::decoder::XzDecoder>`
    = note: required because it appears within the type `async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>`
    = note: required because it appears within the type `upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>`
    = note: required because it appears within the type `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`

error[E0277]: `std::rc::Rc<()>` cannot be sent between threads safely
   --> src/main.rs:31:5
    |
31  |       web::block(move || {
    |  _____^^^^^^^^^^_-
    | |     |
    | |     `std::rc::Rc<()>` cannot be sent between threads safely
32  | |         let mut archive = tar::Archive::new(blocking);
33  | |         archive.unpack("./out")
34  | |     });
    | |_____- within this `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`
    | 
   ::: /home/<user>/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-web-3.0.2/src/web.rs:279:35
    |
279 |       F: FnOnce() -> Result<I, E> + Send + 'static,
    |                                     ---- required by this bound in `actix_web::web::block`
    |
    = help: within `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<()>`
    = note: required because it appears within the type `std::marker::PhantomData<std::rc::Rc<()>>`
    = note: required because it appears within the type `actix_utils::task::LocalWaker`
    = note: required because it appears within the type `actix_multipart::server::Safety`
    = note: required because it appears within the type `actix_multipart::Field`
    = note: required because it appears within the type `futures::stream::IntoStream<actix_multipart::Field>`
    = note: required because it appears within the type `futures::stream::Map<futures::stream::IntoStream<actix_multipart::Field>, futures_util::fns::MapErrFn<[closure@src/main.rs:27:23: 27:36]>>`
    = note: required because it appears within the type `futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>`
    = note: required because it appears within the type `tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>`
    = note: required because it appears within the type `async_compression::tokio_02::bufread::generic::decoder::Decoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>, async_compression::codec::xz::decoder::XzDecoder>`
    = note: required because it appears within the type `async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>`
    = note: required because it appears within the type `upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>`
    = note: required because it appears within the type `[closure@src/main.rs:31:16: 34:6 blocking:upload::utils::BlockingRead<async_compression::tokio_02::bufread::XzDecoder<tokio::io::StreamReader<futures::stream::MapErr<actix_multipart::Field, [closure@src/main.rs:27:23: 27:36]>, actix_web::web::Bytes>>>]`

error: aborting due to 4 previous errors; 4 warnings emitted

For more information about this error, try `rustc --explain E0277`.
error: could not compile `upload`.

To learn more, run the command again with --verbose.

Okay that's pretty annoying. Try something like this.

use futures::stream::StreamExt; // for .next

fn hyper_to_io(err: hyper::Error) -> std::io::Error {
    todo!()
}

let (send, recv) = tokio::sync::mpsc::channel(8);

let body = tokio::io::stream_reader(recv);
let xz = async_compression::tokio_02::bufread::XzDecoder::new(body);
let blocking = BlockingRead::new(xz);

let ((), b) = tokio::join!(
    async {
        while let Some(item) = field.next().await {
            let _ = send.send(item.map_err(hyper_to_io)).await;
        }
    },
    web::block(move || {
        let mut archive = tar::Archive::new(blocking);
        archive.unpack("./out")
    }),
);

Note that what you posted is not the full error. Prefer to post output from cargo build rather than IDE popup text, as it is a lot more useful to me when helping you.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.