I'm trying to use an OwningHandle (from the owning_ref crate) to combine the ownership of a byte array with the ownership of a future that uses that byte array. This is a toy example, the real use case is in a larger program that re-uses aligned I/O buffers when doing async disk reads and writes into those buffers. The goal is to get the buffer and the future together into an object that I can put into vectors to track operations in flight and implement futures::Stream classes that consume them and yield the results.
The code below doesn't compile, with the error:
error[E0599]: no method named `as_mut` found for reference `&OwningHandle<Vec<u8>, Pin<Box<dyn futures::Future<Output = Result<usize, std::io::Error>> + std::marker::Send>>>` in the current scope
--> src/future_test.rs:28:39
|
28 | let inner_fut = future_handle.as_mut();
| ^^^^^^ method not found in `&OwningHandle<Vec<u8>, Pin<Box<dyn futures::Future<Output = Result<usize, std::io::Error>> + std::marker::Send>>>`
error: aborting due to previous error; 6 warnings emitted
It seems like I should be able to deref into the OwningHandle to get a BoxFuture, and then call .as_mut() on the BoxFuture to get a Pin<&mut dyn Future> that I can poll on. I've tried a few different permutations on dereferencing the OwningHandle explicitly with .deref(), but haven't stumbled upon something that works.
Any help much appreciated! This is my first time trying to use OwningHandle and futures together this way, and it's stretching the limits of how much I understand the types in play here.
extern crate owning_ref;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::future::BoxFuture;
use owning_ref::OwningHandle;
// An OwningHandle for bundling a Future together with the Vec that it's
// using as an I/O buffer.
type FutureHandle<'a> = OwningHandle<Vec<u8>, BoxFuture<'a, std::io::Result<usize>>>;
/// A demonstration Stream type that yields the result from a FutureHandle
struct MyStream<'a> {
future_handle: Option<FutureHandle<'a>>,
}
impl<'a> futures::Stream for MyStream<'a> {
type Item = std::io::Result<usize>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let future_handle = match self.future_handle.as_ref() {
None => return Poll::Ready(None),
Some(fh) => fh,
};
let inner_fut = future_handle.deref_mut();
match inner_fut.poll() {
Poll::Pending => Poll::Pending,
Poll::Ready(i) => {
self.future_handle = None;
Poll::Ready(i)
}
}
}
}
// A placeholder for an I/O operation: IRL this would be something like a socket read
async fn operate(buffer: &[u8]) -> std::io::Result<usize> {
let i = buffer[0];
Ok(1)
}
fn main() {
let buffer: Vec<u8> = Vec::from([0xff; 16]);
let handle: FutureHandle<'_> =
OwningHandle::new_with_fn(buffer, |buffer| -> BoxFuture<'_, std::io::Result<usize>> {
unsafe {
let slice = &buffer.as_ref().unwrap()[0..16];
Box::pin(operate(slice))
}
});
let stream = MyStream {
future_handle: Some(handle),
};
}