How does this block of code work?

This is from simplifying an example from the async tungstenite example dir:

    async fn handle_connection(
        s: async_channel::Sender<Msg_To_Render_Server>,
        raw_stream: TcpStream,
        addr: SocketAddr,
    ) {
        println!("Incoming TCP connection from: {}", addr);

        let ws_stream = async_tungstenite::accept_async(raw_stream)
            .await
            .expect("Error during the websocket handshake occurred");
        println!("WebSocket connection established: {}", addr);

        let (outgoing, incoming) = ws_stream.split();

        let broadcast_incoming = incoming
            .try_filter(|msg| future::ready(!msg.is_close()))
            .try_for_each(|msg| {
                println!("Received a message from {}", addr,);
                println!("data: {:?}", msg.into_data());
                future::ok(())
            });

        pin_mut!(broadcast_incoming);
        broadcast_incoming.await;

        println!("{} disconnected", &addr);
    }

The code 'works' in that I am getting all the prnitlns. Here is what I am confused on: how is this broadcast_incoming.await correlated with the ws closing ? I see the try_filter / fry_for_each ... and am expecting the broadcast_incoming.await to succeed on the first item that passes through. Yet, it appears to run until the ws closes.

What is going on ?

^-- alternatively, could someone help rewrite this block as a for loop or a while loop ? I'm having trouble figuring out what this maps to.

The try_ stream methods stop when an item fails, not when an item succeeds. It's kind of like:

while let Some(next) = incoming.next().await {
    match next {
        Ok(msg) => {
            // filter
            if !future::ready(!msg.is_close()).await {
                continue;
            }
            // for_each
            println!("Received a message from {}", addr,);
            println!("data: {:?}", msg.into_data());
            future::ok(()).await;
        }
        Err(e) => {
            return Err(e);
        }
    }
}
Ok(())
1 Like

I understand, for pedagogical reasons, you are making a literal transliteration. In practice, is the continue the same as a break as in the next iteration iteration.next().await returns a None ?

I don't really understand but I think no? If it evaluates to false, then it skips the current element and tries again. Same as Iterator::filter but with futures.

If you want it to break/end when msg.is_close(), then use try_take_while.

1 Like