This is from simplifying an example from the async tungstenite example dir:
async fn handle_connection(
s: async_channel::Sender<Msg_To_Render_Server>,
raw_stream: TcpStream,
addr: SocketAddr,
) {
println!("Incoming TCP connection from: {}", addr);
let ws_stream = async_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {}", addr);
let (outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming
.try_filter(|msg| future::ready(!msg.is_close()))
.try_for_each(|msg| {
println!("Received a message from {}", addr,);
println!("data: {:?}", msg.into_data());
future::ok(())
});
pin_mut!(broadcast_incoming);
broadcast_incoming.await;
println!("{} disconnected", &addr);
}
The code 'works' in that I am getting all the prnitlns. Here is what I am confused on: how is this broadcast_incoming.await
correlated with the ws closing ? I see the try_filter / fry_for_each ... and am expecting the broadcast_incoming.await to succeed on the first item that passes through. Yet, it appears to run until the ws closes.
What is going on ?