How to stream actix request payload to multiple endpoints

Hi all,

I'm trying to stream large files from an actix-web service to multiple endpoints and am having issues with the endpoints disconnecting early.

fakeshadow over in the Actix-web discord channel was kind enough to help me out getting a simple streaming endpoint working, but it fails when I start streaming files larger than 3Mb.

Here's the code:

use std::{convert::Infallible, error::Error};

use actix_web::{http::header, web, HttpRequest, HttpResponse};
use futures::{channel::mpsc::unbounded, SinkExt};
use reqwest::{
    header::{CONNECTION, CONTENT_LENGTH, CONTENT_TYPE},
    Body, Client,
};
use tokio_stream::StreamExt;

pub async fn send_multiple(
    request: HttpRequest,
    client: web::Data<Client>,
    mut payload: web::Payload,
) -> Result<HttpResponse, Box<dyn Error>> {
    let content_length = match get_content_length(&request) {
        Some(cl) => {
            println!("Content-Length: {}", cl);
            cl
        }
        None => return Ok(HttpResponse::BadRequest().body("Invalid CONTENT_LENGTH header.")),
    };

    let urls = [
        "http://localhost:5001",
        "http://localhost:5001",
        "http://localhost:5001",
    ];

    let reqs = (0..3)
        .map(|index| {
            let cli = client.clone();
            let (tx, rx) = unbounded::<Result<_, Infallible>>();
            let handles = tokio::spawn(async move {
                let response = cli
                    .post(urls[index])
                    .header(CONNECTION, "keep-alive")
                    .header(CONTENT_LENGTH, content_length)
                    .header(CONTENT_TYPE, "application/octet-stream")
                    .body(Body::wrap_stream(rx))
                    .send()
                    .await;

                if let Err(err) = &response {
                    println!("Response Error: {:?}", err);
                }

                response
            });
            (handles, tx)
        })
        .collect::<Vec<_>>();

    while let Some(chunk) = payload.next().await {
        let chunk = chunk?;

        for (_, tx) in reqs.iter() {
            let mut tx = tx.clone();
            let chunk = chunk.clone();
            match tx.send(Ok(chunk)).await {
                Ok(()) => {}
                Err(err) => {
                    println!("TaskJoinError: {:?}", err);
                    // return Err(err.into());
                }
            }
        }
    }

    for (handle, _) in reqs {
        let response = match handle.await {
            Ok(r) => match r {
                Ok(res) => {} //res,
                Err(err) => {
                    println!("RequestError: {:?}", err);
                    // return Err(err.into());
                }
            },
            Err(err) => {
                println!("JoinError: {:?}", err);
                // return Err(err.into());
            }
        };

        // println!("Response: Status - {}", response.status());
    }

    Ok(HttpResponse::Ok().finish())
}

fn get_content_length(request: &HttpRequest) -> Option<usize> {
    match request.headers().get(header::CONTENT_LENGTH) {
        Some(fs_header) => match fs_header.to_str() {
            Ok(fs_header_str) => match fs_header_str.parse::<usize>() {
                Ok(s) => Some(s),
                Err(err) => {
                    println!("Unable to parse header::CONTENT_LENGTH: {:?}", err);
                    None
                }
            },
            Err(err) => {
                println!("Unable to parse header::CONTENT_LENGTH: {:?}", err);
                None
            }
        },
        None => None,
    }
}

I get the same error no matter what service the endpoints are written in. I've tried endpoints with ASP.NET and Rust.

...
TaskJoinError: SendError { kind: Disconnected }
RequestError: reqwest::Error { kind: Request, url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("localhost")), port: Some(5001), path: "/", query: None, fragment: None }, source: hyper::Error(Io, Os { code: 10054, kind: ConnectionReset, message: "An existing connection was forcibly closed by the remote host." }) }

Has anyone else had any luck with streaming with Actix and Reqwest?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.