What to pin when implementing AsyncRead?

Hi,

Could someone help in understanding what do I have to pin for this code to compile?

I am not sure pin<boxing< a method makes sense (the compilers' suggestion).

use futures::{
    io::{AsyncRead},
    Future, FutureExt,
};
use std::io::Result;

/// Implements `AsyncRead + AsyncSeek` for a non-blocking function that reads ranges of bytes.
pub struct RangedStreamer<F: Future> {
    pos: u64,
    length: u64,     // total size
    buffer: Vec<u8>, // a ring
    offset: usize,   // offset in the ring: buffer[:offset] have been read
    range_fn: Box<dyn Fn(usize, &mut [u8]) -> F>,
}

impl<F: Future> RangedStreamer<F> {
    pub fn new(
        length: usize,
        range_fn: Box<dyn Fn(usize, &mut [u8]) -> F>,
        mut buffer: Vec<u8>,
    ) -> Self {
        let length = length as u64;
        buffer.clear();
        Self {
            pos: 0,
            range_fn,
            length,
            buffer,
            offset: 0,
        }
    }

    async fn read_more(&mut self, to_consume: usize) -> Result<()> {
        todo!()
    }
}

impl<F: Future> AsyncRead for RangedStreamer<F> {
    fn poll_read(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut [u8],
    ) -> std::task::Poll<Result<usize>> {
        let to_consume = buf.len();
        match self.read_more(to_consume).poll_unpin(cx) {
            std::task::Poll::Ready(x) => {
                std::task::Poll::Ready(x.map(|_| {
                    // copy from the internal buffer.
                    buf[..to_consume]
                        .copy_from_slice(&self.buffer[self.offset..self.offset + to_consume]);
                    // and offset
                    self.offset += to_consume;
                    to_consume
                }))
            }
            std::task::Poll::Pending => std::task::Poll::Pending,
        }
    }
}


(Playground)

Errors:

   Compiling playground v0.0.1 (/playground)
error[E0277]: `from_generator::GenFuture<[static generator@src/lib.rs:35:68: 37:6 {}]>` cannot be unpinned
  --> src/lib.rs:47:42
   |
35 |     async fn read_more(&mut self, to_consume: usize) -> Result<()> {
   |                                                         ---------- within this `impl futures::Future`
...
47 |         match self.read_more(to_consume).poll_unpin(cx) {
   |                                          ^^^^^^^^^^ within `impl futures::Future`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@src/lib.rs:35:68: 37:6 {}]>`
   |
   = note: consider using `Box::pin`
   = note: required because it appears within the type `impl futures::Future`
   = note: required because it appears within the type `impl futures::Future`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.
error: could not compile `playground`

To learn more, run the command again with --verbose.

You have to store the future in your struct for this to be correct. Otherwise you would be dropping the future before it returned Poll::Ready, which won't work. Generally, the design you are trying here will be difficult to get to work. Calling an async fn on the struct itself requires some arcane unsafe code.

1 Like

Thanks a lot, @alice. I wonder what design is advantageous, then: a channel?

RangedStreamer is essentially an async source of bytes that supports seek over a generic "range query" range_fn. The future for querying range_fn will inevitably include self, as range_fn will need to populate the inner buffer.

The easiest would be to write something that implements Stream instead of AsyncRead. One easy way to do this is to use the async-stream crate. Then, you can simply wrap it in a StreamReader to get an AsyncRead.

Ok, maybe it is easier to explain the broader problem.

rust-s3 (AWS s3) has a non-blocking function bucket.get_object_range(path, start, end) that yields Vec<u8> with the requested range. This is a general functionality.

parquet is a file format that supports seeking to specific parts of the file to e.g. only fetch a subset of all rows. The footer of the file declares where parts of the file are.

Thus, to read parquet from s3, it is advantageous to have something that can AsyncRead + AsyncSeek, where seek moves the position and read calls bucket.get_object_range from the position.

The range_fn was my attempt to abstract away bucket.get_object_range(path, start, end). The struct is just buffering the result to avoid many small requests. But now trying to simply a bit:

My hope was to be able to arrive to something like:

use futures::io::{AsyncRead, AsyncSeek};
use parquet2::read::read_metadata_async;
use s3::Bucket;

use std::io::{Result, SeekFrom};

/// Implements `AsyncRead + AsyncSeek` for a non-blocking function that reads bytes.
pub struct RangedStreamer {
    pos: u64,
    length: u64, // total size
    bucket: Bucket,
    path: String,
}

async fn read_s3(start: u64, length: usize, bucket: Bucket, path: String) -> Result<Vec<u8>> {
    let (mut data, _) = bucket
        .get_object_range(&path, start, Some(start + length as u64))
        .await
        .map_err(|x| std::io::Error::new(std::io::ErrorKind::Other, x.to_string()))?;
    data.truncate(length);
    Ok(data)
}

impl RangedStreamer {
    pub fn new(length: usize, bucket: Bucket, path: String) -> Self {
        let length = length as u64;
        Self {
            pos: 0,
            length,
            path,
            bucket,
        }
    }
}

impl AsyncRead for RangedStreamer {
    fn poll_read(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut [u8],
    ) -> std::task::Poll<Result<usize>> {
        let to_consume = buf.len();
        let f = read_s3(self.pos, to_consume, self.bucket.clone(), self.path.clone());
        std::pin::Pin::new(&mut f).poll_read(cx).map(|x| {
            x.map(|values: Vec<u8>| {
                // copy from the internal buffer.
                buf[..to_consume].copy_from_slice(&values);
                to_consume
            })
        })
    }
}

impl AsyncSeek for RangedStreamer {
    fn poll_seek(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        pos: std::io::SeekFrom,
    ) -> std::task::Poll<Result<u64>> {
        match pos {
            SeekFrom::Start(pos) => self.pos = pos,
            SeekFrom::End(pos) => self.pos = (self.length as i64 + pos) as u64,
            SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
        };
        std::task::Poll::Ready(Ok(self.pos))
    }
}

#[tokio::main]
async fn main() {
    let bucket_name = "ursa-labs-taxi-data";
    let region = "us-east-2".parse().unwrap();
    let bucket = Bucket::new_public(bucket_name, region).unwrap();
    let path = "2009/01/data.parquet".to_string();

    let (data, _) = bucket.head_object_blocking(&path).unwrap();
    let length = data.content_length.unwrap() as usize;

    let mut reader = stream1::RangedStreamer::new(length, bucket, path);

    let metadata = read_metadata_async(&mut reader).await.unwrap();
}

where read_metadata_async<R> expects R: AsyncRead + AsyncSend + Unpin + Send.

Alright, this explanation makes it a lot easier to help you. You are going to want a struct that looks something like this:

use futures::future::BoxFuture;

pub struct RangedStreamer {
    pos: u64,
    length: u64, // total size
    state: State,
    path: String,
}

enum State {
    HasChunk {
        start: u64,
        data: Vec<u8>,
        bucket: Bucket,
    },
    Seeking {
        future: BoxFuture<'static, Result<SeekOutput>>,
    }
}

struct SeekOutput {
    start: u64,
    bucket: Bucket,
    data: Vec<u8>,
}

Then you can implement the operations in the following manner:

Seeking

I would not actually do the seek in your seek method. Just update the value of pos to the new position and return immediately. You can perform the seek when a read is requested.

Reading

When reading, you first check if the state is Seeking. If it is, you poll the future, immediately returning Pending if the future returns Pending. If the future completes, change the state to HasChunk with the return value of the seek and proceed as if the state had been HasChunk when initially called. Do not return Pending when the future completes.

Next, you check if the pos field is in range of the current state. If it is, you simply read as much as you can, update pos and return Poll::Ready immediately. This might move pos to the end of the current chunk, and the next call to poll_read will behave as if you just issued a seek to that position.

If pos is outside the current chunk, then you call your read_s3 method and update the state to be Seeking with future set to Box::pin(the_future_returned_by_read_s3). Now, you should go back to the top of poll_read by either wrapping its contents in a loop or by having it call itself recursively. Do not return Pending without first polling the new future.

You may need to clone the Bucket or introduce a state with no fields to obtain ownership of the Bucket, which is required to call read_s3. You will need to update the return value of read_s3 to Result<SeekOutput>.

2 Likes

This is wonderful. Thank you so much. I was able to generalize it to not depend on s3 specifically with

pub type F = std::sync::Arc<
    dyn Fn(u64, usize) -> BoxFuture<'static, std::io::Result<SeekOutput>> + Send + Sync,
>;

pub struct RangedStreamer {
    pos: u64,
    length: u64, // total size
    state: State,
    range_get: F,
    min_request_size: usize, // requests have at least this size
}

pub struct SeekOutput {
    pub start: u64,
    pub data: Vec<u8>,
}

// in main
{
    let range_get = std::sync::Arc::new(move |start: u64, length: usize| {
        let bucket = bucket.clone();
        let path = path.clone();
        Box::pin(async move {
            let bucket = bucket.clone();
            let path = path.clone();
            let (mut data, _) = bucket
                .get_object_range(&path, start, Some(start + length as u64))
                .await
                .map_err(|x| std::io::Error::new(std::io::ErrorKind::Other, x.to_string()))?;

            data.truncate(length);
            Ok(SeekOutput { start, data })
        }) as BoxFuture<'static, std::io::Result<SeekOutput>>
    });
}

Regardless, the solution is beautiful and allowed to support async read of parquet without committing to a runtime engine, Add support to read async by jorgecarleitao · Pull Request #33 · jorgecarleitao/parquet2 · GitHub. Thanks a lot for your help, alice.

I don't understand why range_get is in an Arc. I would probably be using a Box<dyn FnMut(u64, usize) -> ...> instead.

Also, it seems like you are unnecessarily seeking when output.data is shorter than buf.len(). To handle this, the amount of bytes you read should be min(output.data.len() - offset, buf.len()).