Tokio AsyncReadExt `read_exact` method hangs indefinately

I'm trying to implement a JSON RPC server/client connection, but I keep having an issue where either my client or server will hang indefinitely when I'm trying to use the read_exact method (or seemingly any other read method).
I have this helper struct which contains all logic needed for sending/receiving

pub struct TcpPacket;
impl TcpPacket {
    const HEADER_SIZE: usize = std::mem::size_of::<u32>() / std::mem::size_of::<u8>();

    pub fn serialize(mes: impl Into<seraphic::socket::Message>) -> Vec<u8> {
        let payload = Into::<seraphic::socket::Message>::into(mes);
        let vec = serde_json::to_vec(&payload).unwrap();
        let size: u32 = vec.len() as u32;

        tracing::warn!("serialized payload of size: {size}");

        let mut buff = Vec::with_capacity(Self::HEADER_SIZE + vec.len());
        buff.extend_from_slice(&size.to_le_bytes());
        buff.extend_from_slice(&vec);
        buff
    }

    pub async fn read_from_stream<R>(reader: &mut R) -> tokio::io::Result<seraphic::socket::Message>
    where
        R: AsyncReadExt + Unpin,
    {
        let mut header_buf = [0u8; Self::HEADER_SIZE];

        tokio::time::timeout(
            Duration::from_millis(250),
            reader.read_exact(&mut header_buf),
        )
        .await
        .expect("timeout when reading")
        .expect("failed to read header");

        let payload_size = u32::from_le_bytes(header_buf) as usize;
        tracing::warn!("expecting payload of size: {payload_size}");

        let mut payload_buf = vec![0; payload_size];

        tokio::time::timeout(
            Duration::from_millis(250),
            reader.read_exact(&mut payload_buf),
        )
        .await
        .expect("timeout when reading")
        .expect("failed to read payload");

        let message: seraphic::socket::Message = serde_json::from_slice(&payload_buf)
            .map_err(|e| tokio::io::Error::new(tokio::io::ErrorKind::InvalidData, e))?;
        tracing::warn!("got message: {message:#?}");

        Ok(message)
    }
}

The problem arises on either read_exact call in the read_from_stream method. Seemingly randomly, either one of them will hang indefinitely (which is why I have added the timeout guards). I would love help with this as I've been stuck on this for quite awhile and just want to move forward with my project.

Context:

  • seraphic::socket::Message is simply a wrapper around JSON RPC Messages in compliance with the JSON RPC 2.0 spec
  • Other read methods also hang indefinately

This seems really dangerous. If read_exact times out you might have read part of the payload or part of the header, but not all of it, and the stream will be in an invalid state. Your future panics but if it's being spawned, does that panic propagate?

Sorry I'm a little confused by the way you've worded your response.. if the future panics it does propagate

tokio::task::spawn is a panic boundary so you have to unwrap the JoinHandle or at least look at the Result from awaiting it.

This is a minor thing, but it's more idiomatic to write:

R: AsyncRead

Functionally they are equivalent, but the trait that the types actually implement is AsyncRead, so it makes more sense to specify that one.

Recently I experienced a similar hang with tokio::io::SimplexStream, it turns out that dropping the write half of a simplex does not shutdown the write end automatically.

    #[tokio::test]
    async fn test_tokio_simplex() {
        let (r, mut w) = tokio::io::simplex(1024);
        let mut buf = [0u8; 1024];

        tokio::spawn(async move {
            w.write_all(b"test").await.unwrap();
            let _ = w.shutdown().await; // Without this line this test hangs forever
        });
        let mut r = r;
        let r = r.read_exact(&mut buf).await;
        assert!(matches!(r,
            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof
        ));
    }