Tokio TCP Server - mpsc channel doesn't receive messages after the first one

I am new to Rust (and working with TCP networking applications in general) and am attempting to implement a server using Tokio that spawns two tasks, one that reads incoming bytes, parses them, submits the parsed data to an external API, sends related messages to a mpsc::channel, and writes back a response to the client. The second task receives the messages in the channel and processes them (in order to send requests to another API concurrently).

The main processing task seems to be working, but the second one that receives messages only seems to work with the first message sent by the Sender. Subsequent messages sent over the channel are not read, although I can see that the send() call did not return a SendError.

This is the general structure of the code:

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        let (sender, mut receiver) = mpsc::channel::<ChannelElement>(32);
        tokio::spawn(async move { process(socket, sender).await.unwrap() });
        tokio::spawn(async move {
            process_channel_data(&mut receiver)
                        .await
                        .unwrap()
                });
    }

    Ok(())
}

async fn process(stream: TcpStream, sender: Sender<ChannelElement>) -> Result<(), Box<dyn Error>> {
    loop {
        let ready = socket
            .ready(Interest::READABLE | Interest::WRITABLE)
            .await?;
        if ready.is_readable() {
            let mut data = vec![0; 1024];
            match socket.try_read(&mut data) {
                Ok(0) => {
                    break;
                }
                Ok(n) => {
                    data.truncate(n);
                    println!("received {} bytes", n);
                    // do some stuff with the data
                    // to get String processed_data
                    // some helper function that continues to process the data in some way
                    // and also sends messages to the channel for the other task to process
                    if let Ok(resp_str) = helper_function(&processed_data, &sender) {
                        loop {
                            if ready.is_writable() {
                                match socket.try_write(resp_str.as_bytes()) {
                                    Ok(n) => {
                                        println!("wrote {} bytes", n);
                                        break;
                                    }
                                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                                        continue;
                                    }
                                    Err(e) => {
                                        return Err(e.into());
                                    }
                                }
                            }
                        }
                    }
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    continue;
                }
                Err(e) => {
                    return Err(e.into());
                }
            }
        }
    }
    Ok(())
}

async fn helper_function(processed_data: &str, sender: &Sender<ChannelElement>) -> Result<(), Box<dyn Error>> {
    // there are multiple send blocks like:
    match sender
        .send(ChannelElement {
            // data
        })
        .await
    {
        Ok(_) => println!("sent message to channel"),
        Err(e) => println!("{e:?}"),
    }
    // do some other stuff
    Ok(())
}

async fn process_channel_data(receiver: &mut Receiver<ChannelElement>) -> Result<(), Box<dyn Error>> {
    loop {
        let message = receiver.recv().await;
        match message {
            // the channel has been closed, due to the sender being dropped after all sends
            None => {
                break;
            }
            Some(message) => {
                println!("received message: {message:?}");
                // do something with the message
            }
        }
    }
    Ok(())
}

I'm sure this implementation is not idiomatic and there are definite improvements to be made, but I want to fully understand why this initial naive solution doesn't work as I expect.

I've only skimmed your code, but this looks suspicious:

If ready.is_writeable() returns false this become an endless loop...

1 Like

Yeah, that was the issue. I was able to fix it by splitting the socket into OwnedReadHalf and OwnedWriteHalf, and then in the writing loop I changed it to

loop {
    write_socket.writable().await?;
    match write_socket.try_write(response_str.as_bytes()) {
        //
    }
}

so that the writable() check happens at the start of the loop, like how the ready() call was in the original read portion.

I'm still a bit confused as to how the original infinite loop never broke out, because in my logs for an older execution I can see the "wrote x bytes" log, which is in the block that breaks the write loop.

There are operations in the outer loop that are missing awaits. Writing to the TcpStream should have one, calling the helper function should have one, and so forth.

It’s hard to tell if these are transcription errors or something else, but each task needs to periodically yield to the tokio runtime to give it a chance to make progress on other async tasks.

If the only operation that awaits in a loop is one that always returns Poll::Ready, that’s a blocking operation. Rather than testing socket readiness, await on a read or a write operation. That will properly yield while there is nothing to read or write.

Note that

write_socket.writable().await?;
match write_socket.try_write(response_str.as_bytes()) {

is the same as

match write_socket.write(response_str.as_bytes()).await {

Part of it is a transcription error, the helper_function is actually await-ed. I also moved the writing loop into another async function that I await as well. And if I understand Alice's response correctly, writable() + try_write()/readable() + try_read() are the same as the await-ed write()/read(), although maybe it isn't as widely used.

I am pointing out the possibility that calls like

let ready = socket
    .ready(Interest::READABLE | Interest::WRITABLE)
    .await?;

are allowed to return immediately without yielding, even though there is an await there. [1] I wrote a bit about this subject a while ago: Thoughts about profiling Rust/Tokio applications - #13 by parasyte


  1. I am not implying this exact method has this behavior. Just that it is allowed to. ↩︎

1 Like

I removed that socket.ready() line and instead now both the read and the write call their respective readable()/writable() functions:

read_socket.readable().await?;
match read_socket.try_read(&mut data) {
    // read code
}

and as above for the OwnedWriteHalf.
But I think from the documentation that these calls are equivalent to a similar call to ready(). What you are saying is that if an await-ed call to a function like this is the only await-ed operation, then there could be an issue with the function not yielding? But since there are other operations that yield (that I accidentally omitted in my original example) this implementation should be fine to run as a tokio task?

Sorry for the amount of questions, trying to get a better understanding of tasks/futures.

Checking for io::ErrorKind::WouldBlock in a loop is a yellow flag, because that signals the loop has a degenerate case that never yields. It's unlikely to happen, but what are you gaining by taking this risk over using read(&mut data).await?

Having a loop that calls readable().await followed by try_read() is okay and checking for WouldBlock in that case is correct. That's how try_read is supposed to be used. It's just a super low level use-case, and normally you would use read instead.

1 Like

Right. I pointed to WouldBlock because it's a hint that the loop needs to yield. Calling readable().await will yield. But that isn't what happened in the example code in OP. It used an old result from let ready = socket.ready(...).await?; That's the risk I'm highlighting. It's what lead to the infinite loop in the first place.