After I call actix_web::client::ClientRequest::send_stream, how can I know the connection is established successfully.
I am sending an endless chunked stream in request, there is no HTTP response expected unless an error occurs.
let (tx, rx_body) = mpsc::unbounded();
let connect_timeout = Duration::from_secs(2);
let connector = Connector::new()
.timeout(connect_timeout)
.conn_keep_alive(Duration::from_millis(100))
.conn_lifetime(Duration::from_secs(3600))
.finish();
let request = ClientBuilder::new()
.no_default_headers()
.disable_timeout() // we will not receive a response
.connector(connector)
.finish()
.put(uri)
.header(http::header::CACHE_CONTROL, "no-cache")
.header(http::header::CONTENT_TYPE, "application/mqtt")
.send_stream(rx_body);
// send the request, this call does not return until there is a response or connection timeout
let result = request.compat().await;
When server is unable to connect, request.compat().await
does not return until the connection timed out.
Is there a way to receive the event of a successful HTTP connection?
Currently I am using a stupid way to wait for timeout.
use actix_web::http;
use actix_web::client::{ ClientBuilder, Connector};
use futures01::sync::mpsc;
use futures::compat::{Stream01CompatExt, Future01CompatExt};
use futures::stream::TryStreamExt; // for `try_next`
use futures::future::{FutureExt, TryFutureExt};
use bytes::Bytes;
use std::time::Duration;
use tokio::timer::delay;
async fn upload(uri : &str) {
let (tx, rx_body) = mpsc::unbounded();
let connect_timeout = Duration::from_secs(2);
let connector = Connector::new()
.timeout(connect_timeout)
.conn_keep_alive(Duration::from_millis(100))
.conn_lifetime(Duration::from_secs(3600))
.finish();
let request = ClientBuilder::new()
.no_default_headers()
.disable_timeout() // we will not receive a response
.connector(connector)
.finish()
.put(uri)
.header(http::header::CACHE_CONTROL, "no-cache")
.header(http::header::CONTENT_TYPE, "application/mqtt")
.send_stream(rx_body);
info!("Establishing an upload connection to {}", uri);
let mut connection_completed = false;
let future = async move {
// we don't know when connection succeeds, hence wait for a duration a bit longer than connection timeout
let when = tokio::clock::now() + connect_timeout + Duration::from_millis(100);
delay(when).await;
if connection_completed {
// some error occurs and connection failed
// ...
}
else {
// connection is established
// register this connection
// ..
loop {
// send data
// ...
}
}
};
actix::Arbiter::spawn(future.unit_error().boxed_local().compat());
// send the request, this call does not return until there is a response or connection timeout
let result = request.compat().await;
connection_completed = true;
// ...
}
pub fn start(url : String){
let future = async move {
upload(&url).await;
};
actix::Arbiter::spawn(future.unit_error().boxed_local().compat());
}