Actix-web : how to know HTTP connection is established successfully?

After I call actix_web::client::ClientRequest::send_stream, how can I know the connection is established successfully.

I am sending an endless chunked stream in request, there is no HTTP response expected unless an error occurs.

let (tx, rx_body) = mpsc::unbounded();
    
let connect_timeout = Duration::from_secs(2);

let connector = Connector::new()
    .timeout(connect_timeout)
    .conn_keep_alive(Duration::from_millis(100))
    .conn_lifetime(Duration::from_secs(3600))
    .finish();

let request = ClientBuilder::new()
    .no_default_headers()
    .disable_timeout()  // we will not receive a response
    .connector(connector)
    .finish()
    .put(uri)
    .header(http::header::CACHE_CONTROL, "no-cache")
    .header(http::header::CONTENT_TYPE, "application/mqtt")
    .send_stream(rx_body);

// send the request, this call does not return until there is a response or connection timeout
let result = request.compat().await;

When server is unable to connect, request.compat().await does not return until the connection timed out.
Is there a way to receive the event of a successful HTTP connection?
Currently I am using a stupid way to wait for timeout.

use actix_web::http;
use actix_web::client::{ ClientBuilder, Connector};
use futures01::sync::mpsc;
use futures::compat::{Stream01CompatExt, Future01CompatExt};
use futures::stream::TryStreamExt; // for `try_next`
use futures::future::{FutureExt, TryFutureExt};
use bytes::Bytes;
use std::time::Duration;
use tokio::timer::delay;


async fn upload(uri : &str) {

    let (tx, rx_body) = mpsc::unbounded();
    
    let connect_timeout = Duration::from_secs(2);

    let connector = Connector::new()
        .timeout(connect_timeout)
        .conn_keep_alive(Duration::from_millis(100))
        .conn_lifetime(Duration::from_secs(3600))
        .finish();

    let request = ClientBuilder::new()
        .no_default_headers()
        .disable_timeout()  // we will not receive a response
        .connector(connector)
        .finish()
        .put(uri)
        .header(http::header::CACHE_CONTROL, "no-cache")
        .header(http::header::CONTENT_TYPE, "application/mqtt")
        .send_stream(rx_body);

    info!("Establishing an upload connection to {}", uri);
    let mut connection_completed = false;
    let future = async move {

        // we don't know when connection succeeds, hence wait for a duration a bit longer than connection timeout
        let when = tokio::clock::now() + connect_timeout + Duration::from_millis(100);
        delay(when).await;

        if connection_completed {
            // some error occurs and connection failed
            // ...
        }
        else {
            // connection is established

            // register this connection
            // ..

            loop {
                // send data
                // ...
            } 
        }

    };
    actix::Arbiter::spawn(future.unit_error().boxed_local().compat());

    // send the request, this call does not return until there is a response or connection timeout
    let result = request.compat().await;
    connection_completed = true;

    // ...
   
}

pub fn start(url : String){
    let future = async move  {
        upload(&url).await;
    };
    actix::Arbiter::spawn(future.unit_error().boxed_local().compat());
}

You could use the type below, which will notify a oneshot channel when the stream is first polled.

use futures::stream::Stream;
use futures::sync::oneshot::{channel, Sender, Receiver};
use futures::Poll;

pub struct NotifyStream<S> {
    inner: S,
    send: Option<Sender<()>>,
}

impl<S: Stream> NotifyStream<S> {
    pub fn new(inner: S) -> (Self, Receiver<()>) {
        let (send, recv) = channel();
        (NotifyStream {
            inner,
            send: Some(send),
        }, recv)
    }
}

impl<S: Stream> Stream for NotifyStream<S> {
    type Item = S::Item;
    type Error = S::Error;
    fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
        if let Some(send) = self.send.take() {
            // Ignore errors as they just mean the receiver was dropped.
            let _ = send.send(());
        }
        self.inner.poll()
    }
}
4 Likes

Thanks @alice, with NotifyStream, I can now listen on the events.

async fn upload(uri : &str) {

    let (tx, rx_body) = mpsc::unbounded();
    
    let connect_timeout = Duration::from_secs(2);

    let connector = Connector::new()
        .timeout(connect_timeout)
        .conn_keep_alive(Duration::from_millis(100))
        .conn_lifetime(Duration::from_secs(3600))
        .finish();

    let client = ClientBuilder::new()
        .no_default_headers()
        .disable_timeout()  // we will not receive a response
        .connector(connector)
        .finish();

    // rx_ready will be signaled once stream is first polled.
    let (stream, rx_ready) = utils::NotifyStream::new(rx_body);

    // PUT streaming
    let request = client.put(uri)
        .header(http::header::CACHE_CONTROL, "no-cache")
        .header(http::header::CONTENT_TYPE, "application/mqtt")
        .send_stream(stream);

    info!("PUT {}", uri);

    let (tx_done, rx_done) = channel();

    let future = async move {
        let ready = rx_ready.compat().fuse();
        let done = rx_done.compat().fuse();
        pin_mut!(ready, done);

        loop {
            select! {
                result = ready => { 
                    if result.is_ok() {
                        println!("Connection connected {:?}", result); 
                        // TO DO : register tx
                    }
                },
                _ = done => {
                    println!("Connection completed"); 
                    // TO DO : unregister tx
                    return;
                },
            };
        }
    };
    actix::Arbiter::spawn(future.unit_error().boxed_local().compat());

    // send the request, this call does not return until there is a response or error
    let result = request.compat().await;
    let _ = tx_done.send(());

    if let Err(e) = result {
        warn!("PUT {} : {}", uri, e);
        return;
    }
   
}

Note that you don't have to use the old version of the oneshot future. You can replace it with the new version to skip the compat call.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.