Writing large file from hyper 1.0.1 server POST body

Hello I would like to write a large file from the body of a HTTP POST request. I am using the new hyper 1.0.1 crate for implementing the HTTP server and I want to do this in an async context. I am currently stuck on figuring out what structs/functions to use.

This implementation needs to buffer the data and write the file out in parts since it is not possible to load the entire body into memory before writing to a file. Looking around in tokio, hyper, http_body_util, and tokio_util documentation there are several stucts and traits that can be used to accomplish this. From what I can understand tokio::io::copy seems to be what I want to use based on the info from this stackoverflow post (rust - How to write a hyper response body to a file? - Stack Overflow ) which is based on the older pre 1.0 hyper. A reference to the body of the request is obtained by let body = req.body() which gives me an &Incoming type. I can turn this into a BodyStream using http_body_util::BodyStream::new(body);
The problem is that tokio::io::copy needs to have a reader that implements the tokio::io::AsyncRead trait which BodyStream does not. I am unable to find a way to do convert a BodyStream into something that has the required trait. Are there any other ways to write the body into a file?

This is my current code:

This is the compile error:

Davidpdrsn from the Tokio discord gave me a working solution. The problem with my code was that the body needs to be owned in order for it to stream it. This was the solution given to me:

use std::io;

use bytes::Bytes;
use futures::prelude::*;
use http::{Request, Response};
use http_body_util::{BodyStream, Empty};
use hyper::body::Incoming;
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
    loop {
        let (socket, _remote_addr) = listener.accept().await.unwrap();
        tokio::spawn(async move {
            let socket = TokioIo::new(socket);
            let hyper_service = hyper::service::service_fn(handler);
            if let Err(err) = server::conn::auto::Builder::new(TokioExecutor::new())
                .serve_connection(socket, hyper_service)
                .await
            {
                eprintln!("failed to serve connection: {err:#}");
            }
        });
    }
}

async fn handler(request: Request<Incoming>) -> Result<Response<Empty<Bytes>>, io::Error> {
    let body = request.into_body();
    let stream_of_frames = BodyStream::new(body);
    let stream_of_bytes = stream_of_frames
        .try_filter_map(|frame| async move { Ok(frame.into_data().ok()) })
        .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
    let async_read = tokio_util::io::StreamReader::new(stream_of_bytes);
    let mut async_read = std::pin::pin!(async_read);

    let mut destination = tokio::io::stdout();

    tokio::io::copy(&mut async_read, &mut destination).await?;

    destination.flush().await?;

    Ok(Response::new(Empty::new()))
}
[package]
name = "test-binary"
version = "0.1.0"
edition = "2021"

[dependencies]
bytes = "1.5.0"
futures = "0.3.29"
http = "1.0"
http-body-util = "0.1.0"
hyper = "1.0"
hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] }
tokio = { version = "1.27.0", features = ["full"] }
tokio-util = { version = "0.7.10", features = ["io"] }

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.