Bidirectional communication with tokio streams by example

I am a beginner, very excited to learn how Rust combined the concept of futures with the concept of "green threads". But at the moment I am struggling to understand the example of async-client.rs from websockets-rs crate. I would appreciate some help.

In the example (code) there are 2 streams: one with user input (received via a mpsc channel from another thread and mapped into OwnedMessage), and the second with messages coming from the websocket server (also OwnedMessage).

But then there is a piece of code that confused me:

let (sink, stream) = duplex.split();
stream
    .filter_map(|message| {
        println!("Received Message: {:?}", message);
        match message {
            OwnedMessage::Close(e) => Some(OwnedMessage::Close(e)),
            OwnedMessage::Ping(d) => Some(OwnedMessage::Pong(d)),
            _ => None,
        }       
    })
    .select(stdin_ch.map_err(|_| WebSocketError::NoDataAvailable))
    .forward(sink)

My understanding of the code is that the 2 streams are combined (in a round-robin fashion) via select, and then every message (from the two streams) is forward'ed to sink (the websocket server). This understanding is obviously wrong, because this would mean that whenever the client receives a message from server, it bounces it back. I sniffed the wire and this is not what is happening.

My guess is that I simply cannot read properly the documentation of Stream, but here it is what is says (emphasis is mine):

[The select] combinator will attempt to pull items from both streams. Each stream will be polled in a round-robin fashion, and whenever a stream is ready to yield an item that item is yielded.
This future [produced by forward] will drive the stream to keep producing items until it is exhausted, sending each item to the sink.

Where do I make a mistake?

Welcome Iguminski. Fair play for asking. The code is not clear at all, and examples should be commented, otherwise what's the point....

It's not quite clear what the intention of the authors was, but what it actually does in my understanding is more or less what you describe, except that only messages matched in the filter_map get bounced back to the server. So if the server sends Ping or Close, the client answers Pong or Close respectively...

The select combinator will just combine input from stdin and from the filtered server stream and bounce everything to the server. If I understand it well, it also fixes the Error type to avoid compiler errors here.

What's confusing is that the only place where the server sends a close, is after the connection is dropped, so why the client would try to bounce the close message back to the server is beyond me, as per server code when it has send the close, it also has stopped listening... either I don't understand exactly what the code does, or the authors have confused themselves by lack of documentation...

It is nice to share my confusion with someone. Thanks @najamelan.

When it comes to closing, you spotted a genuine error. Somebody already found a solution to this.

But apart from the closing issue, the code seems to work. And this surprises me (and also frustrates a bit).

When the client initiates ping (you need to run async-server, then async-client and type "/ping"), the server responds with "Pong" and client does not bounce this message back:

$ tshark -i loop -f "port 2794" -Y websocket
Capturing on 'Loopback: lo0'
15 1.544717 127.0.0.1 → 127.0.0.1 WebSocket 70 WebSocket Text [FIN]
17 8.628668 127.0.0.1 → 127.0.0.1 WebSocket 66 WebSocket Ping [FIN] [MASKED]
19 8.629020 127.0.0.1 → 127.0.0.1 WebSocket 62 WebSocket Pong [FIN]
21 16.203934 127.0.0.1 → 127.0.0.1 WebSocket 62 WebSocket Connection Close [FIN] [MASKED]
23 16.204658 127.0.0.1 → 127.0.0.1 WebSocket 58 WebSocket Connection Close [FIN]
27 16.205070 127.0.0.1 → 127.0.0.1 WebSocket 62 WebSocket Connection Close [FIN] [MASKED]

I think I will ask this question as an issue in that project, although at first I thought that this is something about the general behaviour of Streams that I simply don't understand.

Reposted as an issue in the original project.

1 Like

maybe I didn't formulate that clearly. The following match decides what the client bounces back:

match message 
{
    OwnedMessage::Close(e) => Some( OwnedMessage::Close(e) ),
    OwnedMessage::Ping (d) => Some( OwnedMessage::Pong (d) ),
    _                      => None                          ,
}

The filter_map will drop any None values. The only messages for which this closure does not return None is for a Close and for a Ping message received from the server. If the client receives a Pong message from the server, it does nothing.

Let me know if that clarifies things...

4 Likes

ah.. right! I didn't notice that it matches OwnedMessage::Ping. Somehow I read this as if it was

OwnedMessage::Pong (d) => Some( OwnedMessage::Pong (d) ),

I think it was an auto-suggestion. Now everything is clear. Thanks!

3 Likes