Implementing AsyncWrite to chain writers

I have two goals: learn idiomatic rust and create a web server while doing it. My 30 years of C and its ilk are not helping at this point. I was feeling like my progress was great until futures rubbed lifetimes, pins, and ownership all over my face. Humbled, I feel like I have read every line written on the topics and I still don't, quite, get it. Regardless, plowing bravely forward I am trying to get the encoding part of the server working properly.

In short, I want to chain transformations together similar to Java Input/OutputStreams. For example (pseudo-code):

let chunk_encoder = ChunkEncoder::new(socket);
let gzip_encoder = GzipEncoder::new(chunk_encoder);

while ( ... ) { // bytes are being read from a file into buf, say
    gzip_encode.write_all(buf)?;
}
gzip_encode.shutdown()? // finish gzip and transfer but do not close socket

This would stream out chunk encoded gzip ( versus buffering it all in memory and sending all at once ). Conceivably I could chain as many AsyncWrites together as needed.

My thought was to simply ( Hah! ) implement AsyncWrite on my ChunkEncoder struct and pass it to GzipEncoder as above. I am open it being suggested the my approach is wrong and I should swing at it differently. Regardless, below is what I have so far. I have gone through a bunch of iterations: storing the future in the struct, passing through the future, trying to understand pin projections, and have landed on the below.

But I seem to have a race condition between sending the last chunk ( b"0\r\b\r\b" ) and the socket getting closed on the server side. Sometimes the chunk comes through but more often the client receives a RST packet.

I am just posting the relevant code. If more is needed for context I am happy to add that as well.

pub trait AsyncSocketStream: AsyncRead + AsyncWrite + Send + Sync {}
impl<T: AsyncRead + AsyncWrite + Send + Sync> AsyncSocketStream for T {}
type BoxStream = Pin<Box<dyn AsyncSocketStream>>;

struct ChunkEncoder<'a> {
    stream: &'a mut BoxStream,
}

impl<'a> ChunkEncoder<'a> {
    fn new(stream: &'a mut BoxStream) -> ChunkEncoder<'a> {
        ChunkEncoder {
            stream,
        }
    }
}

impl<'a> ChunkEncoder<'_> {
    pub async fn end(&mut self) -> Result<(), Box<dyn Error>> {
        println!("Calling end");
        self.stream.write_all(b"0\r\n\r\n").await?;
        Ok(())
    }
}

impl<'a> AsyncWrite for ChunkEncoder<'a> {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        println!("poll_write: {}", buf.len());

        let length = format!("{:X}\r\n", buf.len());

        let v = vec![
            IoSlice::new(length.as_bytes()),
            IoSlice::new(buf),
            IoSlice::new(b"\r\n"),
        ];

        match AsyncWrite::poll_write_vectored(Pin::new(self.get_mut().stream), cx, &v) {
            Poll::Ready(_) => Poll::Ready(Ok(buf.len())),
            Poll::Pending => Poll::Pending,
        }
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        println!("poll_flush");
        AsyncWrite::poll_flush(Pin::new(self.as_mut().stream), cx)
    }

    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        println!("poll_shutdown");
        Poll::Ready(Ok(())) // Place holder - deal with this when I figure out write
    }
}

async fn process(stream: &mut BoxStream) -> Result<(), Box<dyn Error>> {
    let path = std::path::Path::new("./src/main.rs");

    let mut file = match File::open(path) {
        Ok(file) => file,
        Err(error) => {
            println!("Could not open file for read: {}", error);
            // Return a 404 in future code
            return Err(Box::new(error));
        }
    };

    stream
        .write_all(
            "HTTP/1.1 200 Ok\r\nConnection: close\r\nTransfer-Encoding: chunked\r\nContent-Encoding: gzip\r\n\r\n".as_bytes(),
        )
        .await?;

    let chunk_encoder = ChunkEncoder::new(stream);

    let mut writer = GzipEncoder::new(chunk_encoder);

    let mut buffer: [u8; 64] = [0; 64];
    loop {
        let r = match file.read(&mut buffer) {
            Ok(r) => r,
            Err(error) => {
                println!("Error while reading file: {}", error);
                return Err(Box::new(error));
            }
        };

        if r == 0 {
            break;
        }

        writer.write_all(&buffer[0..r]).await?;
    }

    writer.shutdown().await?;
    let mut res = writer.into_inner();

    res.end().await?;  // This should send the final chunk
    res.stream.flush().await?;

    Ok(())
}

Suggestions and criticisms are welcome and invited.

poll_write_vectored doesn't guarantee that it writes all of the chunks. In fact the default implemention can only ever write a single buffer of the ones provided (you didn't specify what crates you're using, but this is true of tokio'sAsyncWrite and futures's)

Additionally you aren't checking how much of the data was written, which is returned from the method. It's possible even if your writer implements poll_write_vectored in a better way than the default impl, it still won't be able to write all of the data in one call

You are 100% correct. Thanks for the feedback. That is absolutely a bug and future problem. This code is in the "prototype until it works" stage and that is still to come. Empirically the data is getting written in its entirety to the socket and being received by the client, except for the chunk trailer which is sent in .end.

Well you shouldn't be calling shutdown before you try to send more data, but since your ChunkWriter is doing nothing in it's poll_shutdown I don't think that should matter, assuming you're using async_compression's GzipEncoder since it just forwards the poll_shutdown call

Do you know if an error is getting returned from any of those calls that you might not be noticing? The connection value being dropped due to an early return would be consistent with the behavior you're seeing.

1 Like

There is a bit of square peg / round hole work going on here. Though it makes sense for it to do it in isolation, calling shutdown to finish the gzipping makes less sense when trying to chain things like I am. Which likely points to a poor choice on my part.

Regardless, no errors are being returned from any calls. Oddly, the RST packet is being sent proactiveyl, not in response to a packet from the client. Logically, somewhere the socket is getting dropped and not closing cleanly. I just do not see it.

I assume from the printlns sprinkled around that you've already ensured that all of the expected functions are called in the right order.

How are you calling process? It's possible the future itself is getting dropped before finishing for some reason.

Here is my main that calls spawn:

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let _guards = init_debug_logging();

    let bind = "127.0.0.1";
    let port = 8080;
    let address = format!("{}:{}", bind, port);

    debug!("Starting HTTP server on {}", address);

    let addr = address
        .to_socket_addrs()?
        .next()
        .ok_or_else(|| io::Error::from(io::ErrorKind::AddrNotAvailable))?;

    let listener = TcpListener::bind(&addr).await?;

    loop {
        let (stream, peer_addr) = listener.accept().await?;

        debug!("Accepted connection from {} on port {}", peer_addr, address);

        tokio::spawn(async move {
            let mut stream: BoxStream = Box::pin(stream);
            match process(&mut stream).await {
                Ok(_) => println!("Client process exited cleanly"),
                Err(error) => println!("Cleint process exited with error: {}", error),
            }
        });
    }

    #[allow(unreachable_code)]
    Ok(())
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.