Pass buf: &[u8] from AsyncWrite::poll_write to an async function (and await it)

I need to await an async function in AsyncWrite::poll_write and pass the buffer to it.

It works perfectly well if I do NOT need to pass references from arguments of AsyncWrite::poll_write to the async function.

Here is what I have now:

// This is the state of async write:
struct AsyncWriteState {
   client:    aws_sdk_s3::Client,
   upload_id: u32, // TODO
}

(yes, it is going to use s3 client to upload a chunk as a multipart upload, but it is not relevant).

Then I define state-machine types, inputs, outputs and the async write() function itself:

// SM stands for State-Machine
// PB stands for Pined Box
type WriteOpSMOutput = (AsyncWriteState, Result<usize, std::io::Error>);
type PBWriteOpSM = std::pin::Pin<Box<dyn std::future::Future<Output = WriteOpSMOutput> + Send>>;
async fn write(mut this: AsyncWriteState) -> WriteOpSMOutput { todo!() }

Then I declare AsyncWrite struct:

struct AsyncWrite {
   ops_state:      Option<AsyncWriteState>,
   write_op_sm:    Option<PBWriteOpSM>,
}

Finally, in poll_write the main magic happens: I ask Rust to generate state-machine out of the async write() function and poll that generated state-machine (aka future):

   fn poll_write(self: std::pin::Pin<&mut Self>,
                 cx: &mut std::task::Context<'_>,
                 buf: &[u8])
                 -> std::task::Poll<Result<usize, std::io::Error>> {
      let this = self.get_mut();
      // If this is the first invocation of poll_write, create state-machine:
      if this.write_op_sm.is_none() {
         let mut ops_state = this.take_state_or_die("write()");
         let state_machine = Box::pin(write(ops_state)) as PBWriteOpSM; // <--- MAGIC HERE
         this.write_op_sm = Some(state_machine);
      }
      // and poll it:
      let state_machine = this.write_op_sm.as_mut().unwrap();
      match state_machine.as_mut().poll(cx) {
         std::task::Poll::Pending => std::task::Poll::Pending,
         std::task::Poll::Ready((ops_state, res)) => {
            this.write_op_sm = None;
            this.ops_state = Some(ops_state);
            std::task::Poll::Ready(res)
         }
      }
   }

It works perfectly fine, apart from one thing: I do not pass buf to the async write() function.

Does anyone know how I can do it?
(of course, without copying the buffer / splitting it into multiple owned chunks)

P.S.

Maybe I could use thread-local variable to pass the buf: &[u8]? But it does not sound like an elegant approach.

poll_write() is not an async function, you cannot use await inside it.

think async fn as a constructor for an opaque Future type. if the async fn takes an argument with lifetime (&[u8] in your example), then the returned Future must have a lifetime parameter too, which means you cannot box it as a Box<dyn Future + 'static>.

unfortunately, I don't know if it's possible to specify lifetime bounds on synthesized futures by async functions. you might have to implement the the state machine by hand.

disclaimer: the code snippet below is to explain the concept, it's not actual valid.

type PBWriteOpSM<'a> = futures::BoxFuture<'a, WriteOpSMOutput>;
fn write<'a>(mut this: AsyncWriteState, buf: &'a [u8]) -> PbWriteOpSM<'a> { todo!() }

but the type signature of poll_write() uses an elided lifetime for the buf parameter, so it doesn't really work at all.

all in all, use async function to authorize state machines is a neat idea, but due to the opaque return type, it's not very composable.

in your example, since the state machine async fn write() takes a buffer, I think you probably want to use futures::io::AsyncWrite instead of a regular Future to represent your state machine, so you can call poll_write() on it.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.