(Tokio & Tungstenite) Having a background reading task but not having to .await for it

Sorry if the title is a tad confusing, I struggled to find the words to summarize my issue.

I have a server that read data from a serial port and forward it to client connected via websocket (using Tungstenite). The client might also send command.

When I first implemented a "blocking" version, I had two thread: one for reading new messages received by the server and one to write the serial data to the client connected. For many reason, this wasn't practical and I would like to implement and async version using tokio (and the binding for Tungstenite).

One issue is that I am not just answering to my client messages (which is what almost all the exemple in tokio-Tungstenite do), I need to actively push unrequested messages to my client.

So if I have the following code :

async fn handle_client(peer: SocketAddr, stream: TcpStream, pck_receiver: BusReader<Packet>) {
    let mut own_bus = pck_receiver;
    let mut ws = accept_async(stream)
        .await
        .expect("Failing to accept socket.");
    info!("Accepted new client");

    let (mut sender, mut receiver) = ws.split();

    loop {
        receiver.by_ref()
            .try_for_each(|msg| {
                info!("received msg: {}", msg);
                future::ok(())
            })
            .await.expect("Couldnt receive?");

        let b = own_bus.try_recv(); // try_recv is non-blocking
        match b {
            Ok(pck) => {
                sender
                    .send(Message::Text(
                        serde_json::to_string(&pck).expect("could not serialize packet"),
                    ))
                    .await
                    .expect("message could not be sent");
            }
            _ => (),
        }
    }
}

I will have the issue that the first part of the loop, where we read the received messages, will block the rest of the loop where we write the packet to the client. And because of this, I won't be able to send the packet to my client unless they first send something.

So my goal would be to isolate the "reader" part of the code in its own task that I don't wait for. But I don't know how to do this, especially since I cannot clone the WebSocketStream nor the receiver.

Since I am a huge beginner with tokio and rust futures, I feel like I am just doing things wrong and there is a much better way, but after reading a lot and trying a lot of different approach, I cannot find how to do this.

Do you have any suggestion ?

A common strategy, probably the most appropriate in this case, is to use a select! macro, which lets you await different kinds of futures at once and respond to whichever one actually occurred. This page in the tokio documentation would be a good starting point.

An alternative which is sometimes more appropriate is to link multiple tasks with a fan-in channel. This is particularly helpful when there is an unknown number of sending-side tasks or they change over time.

Imagine you created an Event like this:

enum Event {
    MessageFromClient(String),
    SerialData(Vec<u8>),
    ClientDisconnected,
}

You could create an mpsc channel and spawn three tasks.

  1. "Writer" - move in the channel receiver and the WS sender. It reads from the channel in a loop, matches on the kind of Event, and writes the appropriate thing to the client.
  2. "Serial reader" - move in a clone of the channel sender. It reads text from the serial port, puts it in an Event::SerialData, and sends it in the channel. It then goes straight back to reading.
  3. "Client reader" - move in the channel sender and the WS receiver. It reads from the websocket, and each time it gets a message it creates an Event::MessageFromClient and sends it into the channel. When reading fails it sends Event::ClientDisconnected and terminates.

But in this case I'd just use tokio::select.

It seems to me like it might also be the case that either of FuturesUnordered or StreamMap rather than select! may be the right thing here,

Assuming a sequence of

  1. reading from the serial port starts but doesn't complete or become Ready before
  2. an "unrequested message" comes in and is ready.

I would think that using select! would potentially be canceling a partial read from the serial port once the unrequested message becomes Ready. Where a futures unordered or streammap would just return the first task which is ready without canceling the other.

Hello, thank you for your answers :slight_smile: .

A common strategy, probably the most appropriate in this case, is to use a select! macro, which lets you await different kinds of futures at once and respond to whichever one actually occurred. This page in the tokio documentation would be a good starting point.

This is what I saw a lot of example do but I had one big interrogation about it: The doc says that all the other branches are dropped after one return. In my case the "writer" branch (the one sending the serial data to the ws) will 99% of the time return almost immediately since the bus I am reading from is non-blocking and not asynchronous. So my worry would be that the "reader" branch would be almost always dropped before it had the chance to do anything. Would it work like that ?

But I guess the solution would be to make the bus reading blocking or, ideally, change for another mpmc channel implementation that support async (like flume or async-channel) ?

An alternative which is sometimes more appropriate is to link multiple tasks with a fan-in channel.

I do like this idea a lot actually. It does involve more code, but I feel like it is a more "flexible" system. Although, for my current use-case, it might be a bit overkill. But I will definitely keep this idea in mind.

I would think that using select! would potentially be canceling a partial read from the serial port once the unrequested message becomes Ready .

I don't think I would have this issue since, right now, the code that handle the serial reading is completely synchronous and is living in its own thread. It communicate with the rest of code via a mpmc channel, so I think the worst that can happen would be a partial read from the mpmc channel ?

Where a futures unordered or streammap would just return the first task which is ready without canceling the other.

This could be exactly what I want, but then what would happen in the next iteration of the loop where we will spawn a "reader" and a "writer" future if possibly the previous one haven't been dropped ? Wouldn't all the future start to step on their own toes ?

I think right now, I will try again with the select! macro, using a async mpmc channel, and if it does not work I will look into the FuturesUnordered and StreamMap.

I worked a bit with async and futures in other languages but I feel like I need to learn a lot more to start being comfortable with Rust async programming :slight_smile:

Ok, I think I really misunderstood how select! worked. I changed my code to this (also switching the mpmc channel for a async one) :

async fn handle_client(peer: SocketAddr, stream: TcpStream, pck_receiver: Receiver<Packet>) {
    let mut ws = accept_async(stream)
        .await
        .expect("Failing to accept socket.");
    info!("Accepted new client");

    let (mut sender, mut receiver) = ws.split();
    
    loop {
        tokio::select! {
            msg = receiver.next() => {
                info!("received msg: {}", msg.unwrap().unwrap());
            }
            pck = pck_receiver.recv_async() => {
                match pck {
                    Ok(pck) => {
                        sender
                            .send(Message::Text(
                                serde_json::to_string(&pck).expect("could not serialize packet"),
                            ))
                            .await
                            .expect("message could not be sent");
                    }
                    _ => (),
                }
            }
        }
    }
}

And it work perfectly :slight_smile: ! Sending messages is a bit long, but I feel like this is more of a waiting-for-write-buffer-to-be-full issue.

Edit: Hmm actually, the long time before sending a message seems to be an issue in my code that i need to figure out. It seem to take a very long time (4-5 seconds) to enter the "sender" branch of the select!

Edit2: Okay, so apparently flume doesn't really like when one consumer is sync and one is async ? Turning off the sync reader make the message be sent instantaneously.

Are you using the sync side inside an async task? Otherwise it should work fine.

I think actually it is just that I misunderstood what the mpmc channel from flume does. What I actually want is more like a "bus" that broadcast a message to all receiver. If one receiver consumes it, it still exist in the other receiver queue.
But that is not what flume does, it even mention it in the documentation :

Note: Cloning the receiver does not turn this channel into a broadcast channel. Each message will only be received by a single receiver. This is useful for implementing work stealing for concurrent programs.

So what I was seeing was not slowness but the other receiver stealing almost all the values.
So I have to find a async broadcast channel or make my own I suppose.

Edit: I just realized that tokio has a broadcast channel in tokio::sync::broadcast

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.