Upload and download with Axum, streaming

I'm trying to write a small axum router that uploads and downloads files.

I would like to stream those files directly to the cloud, but I don't know what to use for streaming.

In Go (my primarily language) I'm using http.ResponseWriter with io.Copy.

But in Rust I'm having real difficulties to understand what to use, this is the code:

  • Cargo.toml:
[package]
name = "upload"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = { version = "0.5.17", features = ["multipart"] }
futures = "0.3.25"
tokio = { version = "1.21.2", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["io"] }
rust-s3 = "0.32.3"
  • main.rs:
use axum::{
    body::{boxed, StreamBody},
    extract::{Multipart, Query},
    http::StatusCode,
    response::Response,
    routing::{on, MethodFilter},
    Router,
};
use futures::TryStreamExt;
use s3::{creds::Credentials, Bucket};
use std::{io, net::SocketAddr, pin::Pin};
use tokio::io::{AsyncRead, AsyncWrite, BufWriter};
use tokio_util::io::StreamReader;

#[tokio::main]
async fn main() {
    let router = Router::new()
        .route("/upload", on(MethodFilter::POST, upload))
        .route("/download/*key", on(MethodFilter::GET, download));

    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));

    axum::Server::bind(&addr)
        .serve(router.into_make_service())
        .await
        .unwrap();
}

pub async fn upload(mut multipart: Multipart) -> Result<Response, StatusCode> {
    while let Some(field) = multipart.next_field().await.unwrap() {
        let filename = if let Some(filename) = field.file_name() {
            filename.to_string()
        } else {
            continue;
        };

        let bucket = Bucket::new(
            "test",
            "us-east-1".parse().unwrap(),
            Credentials::default().unwrap(),
        )
        .unwrap();

        let body_with_io_error = field.map_err(|err| io::Error::new(io::ErrorKind::Other, err));

        let body_reader = StreamReader::new(body_with_io_error);

        futures::pin_mut!(body_reader);

        put_file(bucket, &filename, body_reader);

        return Ok(Response::builder()
            .status(StatusCode::CREATED)
            .body(boxed("OK".to_string()))
            .unwrap());
    }

    Err(StatusCode::INTERNAL_SERVER_ERROR)
}

async fn put_file(
    bucket: Bucket,
    filename: &str,
    mut reader: Pin<&mut (dyn AsyncRead + Send)>,
) -> Result<(), ()> {
    bucket
        .put_object_stream(&mut reader, filename)
        .await
        .unwrap();

    Ok(())
}

pub async fn download(Query(params): Query<Vec<(String, String)>>) -> Result<Response, StatusCode> {
    let filename = params[0].1.to_string();

    let bucket = Bucket::new(
        "test",
        "us-east-1".parse().unwrap(),
        Credentials::default().unwrap(),
    )
    .unwrap();

    // I DON'T KNOW HOW TO START HERE! HELP!!! :smile:
    // What should I use here?

    let writer = BufWriter::new();
    // let writer = ReaderStream::new(reader);
    // futures::pin_mut!(writer);

    get_file(bucket, &filename, writer).await.unwrap();

    let response = Response::builder()
        .body(boxed(StreamBody::new(writer)))
        .unwrap();

    Ok(response)
}

async fn get_file(
    bucket: Bucket,
    filename: &str,
    mut writer: Pin<&mut (dyn AsyncWrite + Send)>,
) -> Result<(), ()> {
    bucket.get_object_stream(filename, &mut writer).await?;

    Ok(())
}

QUESTIONS

  1. The upload function works but I don't know what these lines mean:

    let body_with_io_error = field.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
    let body_reader = StreamReader::new(body_with_io_error);
    futures::pin_mut!(body_reader);
    
  2. The download function doesn't work because I don't know what to use, how to create the writer that get_object_stream() needs.

Addressing your questions first:

let body_with_io_error = field.map_err(|err| io::Error::new(io::ErrorKind::Other, err));

:arrow_up: This line is converting the error that you might get from the multipart field to an io Error.

let body_reader = StreamReader::new(body_with_io_error);

:arrow_up: This line is converting the multipart field into an async stream.

futures::pin_mut!(body_reader);

:arrow_up: This line is pinning the async stream, which is required by the signature in put_object_stream:

pub async fn put_object_stream<R: AsyncRead + Unpin>(
    &self,
    reader: &mut R,
    s3_path: impl AsRef<str>
) -> Result<u16, S3Error>

You can read more about pinning here.

Now, getting to your question regarding download:

Let's begin by looking at the signature from get_object_stream:

pub async fn get_object_stream<T: AsyncWrite + Send + Unpin, S: AsRef<str>>(
    &self,
    path: S,
    writer: &mut T
) -> Result<u16, S3Error>

So, we need to satisfy the trait bounds AsyncWrite + Send + Unpin.

When looking into the documentation for tokio's AsyncWrite trait, you can see that it provides some implementations for foreign types. For example:

impl AsyncWrite for Vec<u8, Global>

Which means that we can pass get_object_stream a vector of u8 so that it works as a buffer.

Vec auto-implements the Send trait based on these conditions:

impl<T, A> Send for Vec<T, A>
where
    A: Send,
    T: Send,

And u8 implements Send, so Vec<u8> would also implement send.

As for the Unpin bound, I guess you should know what to do after reading the article that I mentioned before.

Thank you very much. All this was already somewhat clear to me.

What I don't understand is what writer pass to get_object_stream to return the body to axum for response (and file download).

I tried using let writer: Vec<u8> = vec![]; but I'm stucked with what to do with response: Ok(response.body(boxed(StreamBody::new(writer))).unwrap())... :frowning:

When you respond with a file, you need to define some response headers as well. Take inspiration from here: Example of how to return a file dynamically · Discussion #608 · tokio-rs/axum · GitHub

Thank you for your help.

The issue I have is not with the browser.

I don't know which writer to pass to get_file().

rust-s3 doesn't appear to have been designed with this kind of use case in mind. The docs for get_object_stream only talk about using it to save an object to a file.

In order to do what you want with rust-s3 you need a type that transforms writes into a stream, but some cursory searching didn't find any crates that do that. There may be some combination of existing types that can do it that I haven't thought of though.

Thank you. I just found out the beta version Bucket in s3::bucket - Rust with this method that returns ResponseDataStream in s3::request::request_trait - Rust.

How can I stream this type which is:

pub struct ResponseDataStream {
    pub bytes: Pin<Box<dyn Stream<Item = Bytes>>>,
    pub status_code: u16,
}

???

How to transform this to my type: Pin<Box<dyn AsyncRead + Send>>?

I don't think you need it to be AsyncRead? Based on your earlier code you should only need

let response = Response::builder()
        .body(boxed(StreamBody::new(
            bucket
                .get_object_stream(filename)
                .await
                .unwrap()
                .bytes
                .map(Ok::<_, Infallible>),
        )))
        .unwrap();

Unfortunately it looks like that beta version doesn't include Send on the bytes field trait object, so that doesn't compile as is, but if you lie to the compiler about the trait object being Send (via a wrapper type or a transmute) it does compile.

Obviously you probably shouldn't use either of those strategies, and the crate should probably include Send in the trait object.

I'm using two adapters, one for FS:


let path = Path::new(&self.files_dir).join(filename);

Ok(Box::pin(tokio::fs::File::open(path).await?))

and the one for S3.

So I think I found a common type for the Trait signature:


async fn get_file(&self, filename: &str) -> Result<Pin<Box<dyn AsyncRead + Send>>>;

What can I used instead of AsyncRead?

Stream?

Anyway I suggested rust-s3 author to add Send but he is not responsive these days. I'll wait.

tokio_util has an adapter for converting from AsyncRead to Stream

You could also patch the crate. As long as you aren't publishing your crate to crates.io yet, you wouldn't have to publish the patched version of rust-s3

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.