I'm trying to write a small axum router that uploads and downloads files.
I would like to stream those files directly to the cloud, but I don't know what to use for streaming.
In Go (my primarily language) I'm using http.ResponseWriter
with io.Copy
.
But in Rust I'm having real difficulties to understand what to use, this is the code:
- Cargo.toml:
[package]
name = "upload"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = { version = "0.5.17", features = ["multipart"] }
futures = "0.3.25"
tokio = { version = "1.21.2", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["io"] }
rust-s3 = "0.32.3"
- main.rs:
use axum::{
body::{boxed, StreamBody},
extract::{Multipart, Query},
http::StatusCode,
response::Response,
routing::{on, MethodFilter},
Router,
};
use futures::TryStreamExt;
use s3::{creds::Credentials, Bucket};
use std::{io, net::SocketAddr, pin::Pin};
use tokio::io::{AsyncRead, AsyncWrite, BufWriter};
use tokio_util::io::StreamReader;
#[tokio::main]
async fn main() {
let router = Router::new()
.route("/upload", on(MethodFilter::POST, upload))
.route("/download/*key", on(MethodFilter::GET, download));
let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
axum::Server::bind(&addr)
.serve(router.into_make_service())
.await
.unwrap();
}
pub async fn upload(mut multipart: Multipart) -> Result<Response, StatusCode> {
while let Some(field) = multipart.next_field().await.unwrap() {
let filename = if let Some(filename) = field.file_name() {
filename.to_string()
} else {
continue;
};
let bucket = Bucket::new(
"test",
"us-east-1".parse().unwrap(),
Credentials::default().unwrap(),
)
.unwrap();
let body_with_io_error = field.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
let body_reader = StreamReader::new(body_with_io_error);
futures::pin_mut!(body_reader);
put_file(bucket, &filename, body_reader);
return Ok(Response::builder()
.status(StatusCode::CREATED)
.body(boxed("OK".to_string()))
.unwrap());
}
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
async fn put_file(
bucket: Bucket,
filename: &str,
mut reader: Pin<&mut (dyn AsyncRead + Send)>,
) -> Result<(), ()> {
bucket
.put_object_stream(&mut reader, filename)
.await
.unwrap();
Ok(())
}
pub async fn download(Query(params): Query<Vec<(String, String)>>) -> Result<Response, StatusCode> {
let filename = params[0].1.to_string();
let bucket = Bucket::new(
"test",
"us-east-1".parse().unwrap(),
Credentials::default().unwrap(),
)
.unwrap();
// I DON'T KNOW HOW TO START HERE! HELP!!! :smile:
// What should I use here?
let writer = BufWriter::new();
// let writer = ReaderStream::new(reader);
// futures::pin_mut!(writer);
get_file(bucket, &filename, writer).await.unwrap();
let response = Response::builder()
.body(boxed(StreamBody::new(writer)))
.unwrap();
Ok(response)
}
async fn get_file(
bucket: Bucket,
filename: &str,
mut writer: Pin<&mut (dyn AsyncWrite + Send)>,
) -> Result<(), ()> {
bucket.get_object_stream(filename, &mut writer).await?;
Ok(())
}
QUESTIONS
-
The
upload
function works but I don't know what these lines mean:let body_with_io_error = field.map_err(|err| io::Error::new(io::ErrorKind::Other, err)); let body_reader = StreamReader::new(body_with_io_error); futures::pin_mut!(body_reader);
-
The
download
function doesn't work because I don't know what to use, how to create thewriter
thatget_object_stream()
needs.