Actix-web : how to know HTTP connection is established successfully?

After I call actix_web::client::ClientRequest::send_stream, how can I know the connection is established successfully.

I am sending an endless chunked stream in request, there is no HTTP response expected unless an error occurs.

let (tx, rx_body) = mpsc::unbounded();
    
let connect_timeout = Duration::from_secs(2);

let connector = Connector::new()
    .timeout(connect_timeout)
    .conn_keep_alive(Duration::from_millis(100))
    .conn_lifetime(Duration::from_secs(3600))
    .finish();

let request = ClientBuilder::new()
    .no_default_headers()
    .disable_timeout()  // we will not receive a response
    .connector(connector)
    .finish()
    .put(uri)
    .header(http::header::CACHE_CONTROL, "no-cache")
    .header(http::header::CONTENT_TYPE, "application/mqtt")
    .send_stream(rx_body);

// send the request, this call does not return until there is a response or connection timeout
let result = request.compat().await;

When server is unable to connect, request.compat().await does not return until the connection timed out.
Is there a way to receive the event of a successful HTTP connection?
Currently I am using a stupid way to wait for timeout.

use actix_web::http;
use actix_web::client::{ ClientBuilder, Connector};
use futures01::sync::mpsc;
use futures::compat::{Stream01CompatExt, Future01CompatExt};
use futures::stream::TryStreamExt; // for `try_next`
use futures::future::{FutureExt, TryFutureExt};
use bytes::Bytes;
use std::time::Duration;
use tokio::timer::delay;


async fn upload(uri : &str) {

    let (tx, rx_body) = mpsc::unbounded();
    
    let connect_timeout = Duration::from_secs(2);

    let connector = Connector::new()
        .timeout(connect_timeout)
        .conn_keep_alive(Duration::from_millis(100))
        .conn_lifetime(Duration::from_secs(3600))
        .finish();

    let request = ClientBuilder::new()
        .no_default_headers()
        .disable_timeout()  // we will not receive a response
        .connector(connector)
        .finish()
        .put(uri)
        .header(http::header::CACHE_CONTROL, "no-cache")
        .header(http::header::CONTENT_TYPE, "application/mqtt")
        .send_stream(rx_body);

    info!("Establishing an upload connection to {}", uri);
    let mut connection_completed = false;
    let future = async move {

        // we don't know when connection succeeds, hence wait for a duration a bit longer than connection timeout
        let when = tokio::clock::now() + connect_timeout + Duration::from_millis(100);
        delay(when).await;

        if connection_completed {
            // some error occurs and connection failed
            // ...
        }
        else {
            // connection is established

            // register this connection
            // ..

            loop {
                // send data
                // ...
            } 
        }

    };
    actix::Arbiter::spawn(future.unit_error().boxed_local().compat());

    // send the request, this call does not return until there is a response or connection timeout
    let result = request.compat().await;
    connection_completed = true;

    // ...
   
}

pub fn start(url : String){
    let future = async move  {
        upload(&url).await;
    };
    actix::Arbiter::spawn(future.unit_error().boxed_local().compat());
}

You could use the type below, which will notify a oneshot channel when the stream is first polled.

use futures::stream::Stream;
use futures::sync::oneshot::{channel, Sender, Receiver};
use futures::Poll;

pub struct NotifyStream<S> {
    inner: S,
    send: Option<Sender<()>>,
}

impl<S: Stream> NotifyStream<S> {
    pub fn new(inner: S) -> (Self, Receiver<()>) {
        let (send, recv) = channel();
        (NotifyStream {
            inner,
            send: Some(send),
        }, recv)
    }
}

impl<S: Stream> Stream for NotifyStream<S> {
    type Item = S::Item;
    type Error = S::Error;
    fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
        if let Some(send) = self.send.take() {
            // Ignore errors as they just mean the receiver was dropped.
            let _ = send.send(());
        }
        self.inner.poll()
    }
}
2 Likes

Thanks @alice, with NotifyStream, I can now listen on the events.

async fn upload(uri : &str) {

    let (tx, rx_body) = mpsc::unbounded();
    
    let connect_timeout = Duration::from_secs(2);

    let connector = Connector::new()
        .timeout(connect_timeout)
        .conn_keep_alive(Duration::from_millis(100))
        .conn_lifetime(Duration::from_secs(3600))
        .finish();

    let client = ClientBuilder::new()
        .no_default_headers()
        .disable_timeout()  // we will not receive a response
        .connector(connector)
        .finish();

    // rx_ready will be signaled once stream is first polled.
    let (stream, rx_ready) = utils::NotifyStream::new(rx_body);

    // PUT streaming
    let request = client.put(uri)
        .header(http::header::CACHE_CONTROL, "no-cache")
        .header(http::header::CONTENT_TYPE, "application/mqtt")
        .send_stream(stream);

    info!("PUT {}", uri);

    let (tx_done, rx_done) = channel();

    let future = async move {
        let ready = rx_ready.compat().fuse();
        let done = rx_done.compat().fuse();
        pin_mut!(ready, done);

        loop {
            select! {
                result = ready => { 
                    if result.is_ok() {
                        println!("Connection connected {:?}", result); 
                        // TO DO : register tx
                    }
                },
                _ = done => {
                    println!("Connection completed"); 
                    // TO DO : unregister tx
                    return;
                },
            };
        }
    };
    actix::Arbiter::spawn(future.unit_error().boxed_local().compat());

    // send the request, this call does not return until there is a response or error
    let result = request.compat().await;
    let _ = tx_done.send(());

    if let Err(e) = result {
        warn!("PUT {} : {}", uri, e);
        return;
    }
   
}

Note that you don't have to use the old version of the oneshot future. You can replace it with the new version to skip the compat call.