I have two goals: learn idiomatic rust and create a web server while doing it. My 30 years of C and its ilk are not helping at this point. I was feeling like my progress was great until futures rubbed lifetimes, pins, and ownership all over my face. Humbled, I feel like I have read every line written on the topics and I still don't, quite, get it. Regardless, plowing bravely forward I am trying to get the encoding part of the server working properly.
In short, I want to chain transformations together similar to Java Input/OutputStreams. For example (pseudo-code):
let chunk_encoder = ChunkEncoder::new(socket);
let gzip_encoder = GzipEncoder::new(chunk_encoder);
while ( ... ) { // bytes are being read from a file into buf, say
gzip_encode.write_all(buf)?;
}
gzip_encode.shutdown()? // finish gzip and transfer but do not close socket
This would stream out chunk encoded gzip ( versus buffering it all in memory and sending all at once ). Conceivably I could chain as many AsyncWrites together as needed.
My thought was to simply ( Hah! ) implement AsyncWrite
on my ChunkEncoder
struct and pass it to GzipEncoder
as above. I am open it being suggested the my approach is wrong and I should swing at it differently. Regardless, below is what I have so far. I have gone through a bunch of iterations: storing the future in the struct, passing through the future, trying to understand pin projections, and have landed on the below.
But I seem to have a race condition between sending the last chunk ( b"0\r\b\r\b"
) and the socket getting closed on the server side. Sometimes the chunk comes through but more often the client receives a RST
packet.
I am just posting the relevant code. If more is needed for context I am happy to add that as well.
pub trait AsyncSocketStream: AsyncRead + AsyncWrite + Send + Sync {}
impl<T: AsyncRead + AsyncWrite + Send + Sync> AsyncSocketStream for T {}
type BoxStream = Pin<Box<dyn AsyncSocketStream>>;
struct ChunkEncoder<'a> {
stream: &'a mut BoxStream,
}
impl<'a> ChunkEncoder<'a> {
fn new(stream: &'a mut BoxStream) -> ChunkEncoder<'a> {
ChunkEncoder {
stream,
}
}
}
impl<'a> ChunkEncoder<'_> {
pub async fn end(&mut self) -> Result<(), Box<dyn Error>> {
println!("Calling end");
self.stream.write_all(b"0\r\n\r\n").await?;
Ok(())
}
}
impl<'a> AsyncWrite for ChunkEncoder<'a> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
println!("poll_write: {}", buf.len());
let length = format!("{:X}\r\n", buf.len());
let v = vec![
IoSlice::new(length.as_bytes()),
IoSlice::new(buf),
IoSlice::new(b"\r\n"),
];
match AsyncWrite::poll_write_vectored(Pin::new(self.get_mut().stream), cx, &v) {
Poll::Ready(_) => Poll::Ready(Ok(buf.len())),
Poll::Pending => Poll::Pending,
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
println!("poll_flush");
AsyncWrite::poll_flush(Pin::new(self.as_mut().stream), cx)
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
println!("poll_shutdown");
Poll::Ready(Ok(())) // Place holder - deal with this when I figure out write
}
}
async fn process(stream: &mut BoxStream) -> Result<(), Box<dyn Error>> {
let path = std::path::Path::new("./src/main.rs");
let mut file = match File::open(path) {
Ok(file) => file,
Err(error) => {
println!("Could not open file for read: {}", error);
// Return a 404 in future code
return Err(Box::new(error));
}
};
stream
.write_all(
"HTTP/1.1 200 Ok\r\nConnection: close\r\nTransfer-Encoding: chunked\r\nContent-Encoding: gzip\r\n\r\n".as_bytes(),
)
.await?;
let chunk_encoder = ChunkEncoder::new(stream);
let mut writer = GzipEncoder::new(chunk_encoder);
let mut buffer: [u8; 64] = [0; 64];
loop {
let r = match file.read(&mut buffer) {
Ok(r) => r,
Err(error) => {
println!("Error while reading file: {}", error);
return Err(Box::new(error));
}
};
if r == 0 {
break;
}
writer.write_all(&buffer[0..r]).await?;
}
writer.shutdown().await?;
let mut res = writer.into_inner();
res.end().await?; // This should send the final chunk
res.stream.flush().await?;
Ok(())
}
Suggestions and criticisms are welcome and invited.