Hi all,
I'm trying to stream large files from an actix-web service to multiple endpoints and am having issues with the endpoints disconnecting early.
fakeshadow
over in the Actix-web discord channel was kind enough to help me out getting a simple streaming endpoint working, but it fails when I start streaming files larger than 3Mb.
Here's the code:
use std::{convert::Infallible, error::Error};
use actix_web::{http::header, web, HttpRequest, HttpResponse};
use futures::{channel::mpsc::unbounded, SinkExt};
use reqwest::{
header::{CONNECTION, CONTENT_LENGTH, CONTENT_TYPE},
Body, Client,
};
use tokio_stream::StreamExt;
pub async fn send_multiple(
request: HttpRequest,
client: web::Data<Client>,
mut payload: web::Payload,
) -> Result<HttpResponse, Box<dyn Error>> {
let content_length = match get_content_length(&request) {
Some(cl) => {
println!("Content-Length: {}", cl);
cl
}
None => return Ok(HttpResponse::BadRequest().body("Invalid CONTENT_LENGTH header.")),
};
let urls = [
"http://localhost:5001",
"http://localhost:5001",
"http://localhost:5001",
];
let reqs = (0..3)
.map(|index| {
let cli = client.clone();
let (tx, rx) = unbounded::<Result<_, Infallible>>();
let handles = tokio::spawn(async move {
let response = cli
.post(urls[index])
.header(CONNECTION, "keep-alive")
.header(CONTENT_LENGTH, content_length)
.header(CONTENT_TYPE, "application/octet-stream")
.body(Body::wrap_stream(rx))
.send()
.await;
if let Err(err) = &response {
println!("Response Error: {:?}", err);
}
response
});
(handles, tx)
})
.collect::<Vec<_>>();
while let Some(chunk) = payload.next().await {
let chunk = chunk?;
for (_, tx) in reqs.iter() {
let mut tx = tx.clone();
let chunk = chunk.clone();
match tx.send(Ok(chunk)).await {
Ok(()) => {}
Err(err) => {
println!("TaskJoinError: {:?}", err);
// return Err(err.into());
}
}
}
}
for (handle, _) in reqs {
let response = match handle.await {
Ok(r) => match r {
Ok(res) => {} //res,
Err(err) => {
println!("RequestError: {:?}", err);
// return Err(err.into());
}
},
Err(err) => {
println!("JoinError: {:?}", err);
// return Err(err.into());
}
};
// println!("Response: Status - {}", response.status());
}
Ok(HttpResponse::Ok().finish())
}
fn get_content_length(request: &HttpRequest) -> Option<usize> {
match request.headers().get(header::CONTENT_LENGTH) {
Some(fs_header) => match fs_header.to_str() {
Ok(fs_header_str) => match fs_header_str.parse::<usize>() {
Ok(s) => Some(s),
Err(err) => {
println!("Unable to parse header::CONTENT_LENGTH: {:?}", err);
None
}
},
Err(err) => {
println!("Unable to parse header::CONTENT_LENGTH: {:?}", err);
None
}
},
None => None,
}
}
I get the same error no matter what service the endpoints are written in. I've tried endpoints with ASP.NET and Rust.
...
TaskJoinError: SendError { kind: Disconnected }
RequestError: reqwest::Error { kind: Request, url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("localhost")), port: Some(5001), path: "/", query: None, fragment: None }, source: hyper::Error(Io, Os { code: 10054, kind: ConnectionReset, message: "An existing connection was forcibly closed by the remote host." }) }
Has anyone else had any luck with streaming with Actix and Reqwest?