How can I make this method Stream agnostic?

I have the following method:

async fn transfer_all(stream: &mut TcpStream) -> Result<Vec<Vec<u8>>, Box<dyn std::error::Error>> {
  let mut packets: Vec<Vec<u8>> = Vec::new();
  let mut header = true;
  let mut length: usize = 0;

  let mut packet: Vec<u8> = Vec::new();
  loop {
    stream.readable().await?;

    if header {
      length = 5;
      packet.clear();
      packet.shrink_to_fit();
      packet.reserve(length);
    }

    let mut buf: Vec<u8> = vec![0u8; length];
    match stream.try_read(&mut buf) {
      Ok(0) => {
        break;
      }
      Ok(n) => {
        if header {
          length = u32::from_be_bytes(pop(&buf[1..])) as usize - 4;
          header = false;
          packet.append(&mut buf);
          packet.reserve(length);
          continue;
        }
        packet.append(&mut buf);
        packets.push(packet.clone());
        header = true;
      }
      Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
        break;
      }
      Err(e) => {
        return Err(e.into());
      }
    }
  }

  Ok(packets)
}

It works with TcpStream but I need to also make it work with UnixStream. Since this is a fairly convoluted state machine I'd rather not have two implementations. I tried to make it generic: async fn transfer_all<T: Read>(stream: &mut T)... but I can't get the compiler to recognize try_read. How can I make this method work with TcpStream and UnixStream?

Assuming you use tokio, you could try to re-write your function only using API provided by AsyncReadExt. If that works for you, you’d need a T: AsyncRead + Unpin bound, and need to have tokio::io::AsyncReadExt in scope (with a use statement). I guess something like this should be more-or-less equivalent to your code (applying my best guesses as to what the pop function does):

use std::convert::TryInto;
use std::io;
use tokio::io::{AsyncRead, AsyncReadExt};
fn pop(x: &[u8]) -> [u8; 4] { x[0..3].try_into().unwrap() }

async fn transfer_all<S: AsyncRead + Unpin>(stream: &mut S) -> Result<Vec<Vec<u8>>, Box<dyn std::error::Error>> {
  let mut packets: Vec<Vec<u8>> = Vec::new();
  let mut header = true;
  let mut length: usize = 0;

  let mut packet: Vec<u8> = Vec::new();
  loop {
    if header {
      length = 5;
      packet.clear();
      packet.shrink_to_fit();
      packet.reserve(length);
    }

    let mut buf: Vec<u8> = vec![0u8; length];
    match stream.read(&mut buf).await {
      Ok(0) => {
        break;
      }
      Ok(n) => {
        if header {
          length = u32::from_be_bytes(pop(&buf[1..])) as usize - 4;
          header = false;
          packet.append(&mut buf);
          packet.reserve(length);
          continue;
        }
        packet.append(&mut buf);
        packets.push(packet.clone());
        header = true;
      }
      Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
        break;
      }
      Err(e) => {
        return Err(e.into());
      }
    }
  }

  Ok(packets)
}

(playground)

The only difference is that your version can wait with allocating the Vec until it is actually needed, while the call to .read needs the Vec available before it starts waiting for it to become readable; however, I think you could consider re-structuring the code in a way that re-allocating the buf in every loop is avoided, anyways.

This seems to block on match stream.read(&mut buf).await { when there's no more data to read, which is what I wanted to avoid.

One option would be to define a new trait with try_read and readable methods. Implement it for both TcpStream and UnixStream. And have the implementations just call readable and try_read calls from the relevant type.

How do I make a UnixStream not block when there's no more data to read?

Like File and TcpStream, it returns Ok(0) on end.

If n is 0, then it can indicate one of two scenarios:

  1. This reader has reached its "end of file" and will likely no longer be able to produce bytes. Note that this does not mean that the reader will always no longer be able to produce bytes.
  2. The buffer specified was 0 bytes in length.

Sorry for coming back so late. readable() is an async function. How can I put that into a trait?

I generally use the async-trait crate for that which makes it easy. Quoting the readme, it works as follows.

Async fns get transformed into methods that return Pin<Box<dyn Future + Send + 'async_trait>> and delegate to a private async freestanding function.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.