(Also cross-posted on StackOverflow.)
I have a Rust app which is running a Hyper server and uses zipit to handle on-the-fly zip archive creation. Everything works perfectly, but the only problem is that it seems to only handle 6 streams at once. Any other requests hang until another stream is done.
use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Request, Response, Server, StatusCode};
use tokio::io::{duplex, AsyncWriteExt};
use tokio_util::io::ReaderStream;
use zipit::{Archive, FileDateTime};
use futures::stream::TryStreamExt;
async fn zip_archive(_req: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
println!("Entering function");
let file_urls = vec![
// vec of &str
];
let (w, r) = duplex(16 * 1024);
tokio::spawn(async move {
let mut archive = Archive::new(w);
for (idx, url) in file_urls.iter().enumerate() {
let bytes = reqwest::get(url.to_string())
.await.unwrap()
.bytes_stream().map_err(std::io::Error::other);
let mut reader = tokio_util::io::StreamReader::new(bytes);
// Append stream to archive
archive.append(
format!("{}.mp3", idx),
FileDateTime::now(),
&mut reader)
.await.unwrap();
}
archive.finalize().await.unwrap();
});
println!("Returning");
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/zip")
.body(Body::wrap_stream(ReaderStream::new(r)))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let address = ([127, 0, 0, 1], 8080).into();
let service =
make_service_fn(|_| async { Ok::<_, hyper::http::Error>(service_fn(zip_archive)) });
let server = Server::bind(&address).serve(service);
println!("Listening on http://{}", address);
server.await?;
Ok(())
}
I'm a bit new to Rust, so apologies if there's a super simple mistake that's causing all of this.
I thought at first maybe this was somehow related to the duplex - even though it should be unique per request - so I increased it's size to 16kbs, but it changed nothing and still hangs at 6 streams.