In tokio, trying to handle multiple message sources in one task

I'm making a server using tokio. (This server.)

It's been pretty smooth sailing thus far. My inner per-client task is structured like this: (full, hideous code here)

/// Repeatedly uses `socket.read_until()` until a whole message is received
async fn get_message(socket: &mut BufReader<TcpStream>)
    -> std::io::Result<Option<Message>>;
/// Repeatedly uses `socket.write_all()` until the whole message is sent
async fn send_response(socket: &mut BufReader<TcpStream>, response: Message>);

async fn inner_client(socket: TcpStream) -> std::io::Result<()> {
    let socket = BufReader::new(socket);
    loop {
        let message = match get_message(&mut socket).await? {
            Some(x) => x,
            None => return Ok(()),
        };
        // handle the message, which might include:
        send_response(&mut socket, some_response);
    }
}

All well and good so far. But now I have a problem. Up until now, all server-to-client messages been in direct response to a client-to-server message. Now I've added a message type that requires a message to be sent to other connected clients, without them having to poll for it.

What I want is something like:

async fn inner_client(socket: TcpStream, oob_messages: Receiver<Message>)
    -> std::io::Result<()> {
    let socket = BufReader::new(socket);
    loop {
        tokio::select! {
            message = get_message(&mut socket) => {
                let message = match message {
                    Some(x) => x,
                    None => return Ok(()),
                };
                // handle the message, which might include:
                send_response(&mut socket, some_response);
            },
            oob = oob_messages.recv() => {
                // something like:
                send_response(&mut socket, some_response);
            },
        }
    }
}

But even without trying to compile that code, I can see a problem; it seems that if the oob_messages.recv() branch is executed, then next time through the loop a new get_message future would be started and the previous one discarded. This would cause problems... I think. (The real get_message uses a buffer to hold an incomplete message, omitted from the above simplification, but I don't think that's enough to save it.)

I've tried structuring it so that the get_message future is held onto "outside" of the loop, and recreated every time a message is received, but aside from being clunky and ugly, that creates a situation where socket is mutably borrowed more than once, which is a huge no-no. Another solution I've considered is splitting the client task into three: one that reads messages from the client, one that reads out-of-band messages, and one that does select! on an mpsc channel written to by the others... but then won't I be in trouble if the "read messages" task is waiting on socket.read_until and the select! task tries to write? (Guarding the socket with a Mutex would just make it deadlock there...) And then how do I cleanly break things off once the client has disconnected?

I'm just not sure how to proceed from here. This is my first serious attempt at using futures in any language, and I'm definitely not used to the pattern yet. Any help would be very much appreciated.

1 Like

I finally found TcpStream.into_split() and have now taken the three-task approach. It seems to work, but I feel that it's not as efficient as it could be. Oh well.

One way around the issue you described is to use tokio_util::codec which would allow you to handle framing messages in a way that doesn't loose data when used in a select!.

1 Like

So that would let me reimplement get_message as a Stream... I see. That should get me back where I want in performance and code cleanliness. Thanks.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.