I'm trying to implement a Stream by hand based on the answer to this question. It seems to me that my safety logic is sound, since I only ever access conn
or inp
after checking that the current future is None
(pursuant to Alice Rhyl's comment that UB comes from accessing fields referred to by the future while it still exists).
The problem is that Next
requires its inner type to be Unpin
, which this solution specifically relies on being untrue. I have just learned about the async-stream crate, which would obviously be much easier, but I would rather not pay for message passing if I can avoid it. Is there any way to fix this code? I can't get ownership of the Connection
because they're coming from a deadpool::managed::Pool
.
Also, I added this Unpin
bound to C early in development because I figured it was auto-implemented and it couldn't hurt, but I have since learned that to be a bad call. I don't think my codebase relies on it and I could remove that bound with a little work, but that wouldn't fix the current problem.
If it isn't obvious, this is my first time dealing with async at such a low level, so I'm a little lost here.
pub struct Query<'conn, T, C>
where
T: ProtocolEncoding,
C: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
conn: &'conn mut Connection<T, C>,
inp: GenQueryInp,
last_page: bool,
rows_processed: u32,
current_page: Option<Vec<Vec<String>>>,
query_future: Option<BoxFuture<'static, Result<GenQueryOut, IrodsError>>>,
_pinned: PhantomPinned,
}
impl<'conn, T, C> Query<'conn, T, C>
where
T: ProtocolEncoding,
C: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
pub fn new(conn: &'conn mut Connection<T, C>, inp: GenQueryInp) -> Self {
Self {
conn,
inp,
last_page: false,
rows_processed: 0,
current_page: None,
query_future: None,
_pinned: PhantomPinned,
}
}
}
impl<'conn, T, C> Stream for Query<'conn, T, C>
where
T: ProtocolEncoding + Send + Sync,
C: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + Sync,
{
type Item = Result<Vec<String>, IrodsError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = unsafe { Pin::into_inner_unchecked(self) };
if this.current_page.is_some() {
// Do we have a page ready?
let page = unsafe { this.current_page.as_mut().unwrap_unchecked() };
match page.pop() {
// Does this page have another row?
Some(row) => {
this.rows_processed += 1;
return Poll::Ready(Some(Ok(row)));
}
None => {
// If not, start polling for another page
this.current_page = None;
}
}
}
if this.query_future.is_none() {
if this.last_page {
return Poll::Ready(None);
}
if this.rows_processed >= this.inp.max_rows {
return Poll::Ready(None);
}
let fut: BoxFuture<Result<GenQueryOut, IrodsError>> =
this.conn.query(&this.inp).boxed();
let fut: BoxFuture<'static, Result<GenQueryOut, IrodsError>> =
unsafe { std::mem::transmute(fut) };
this.query_future = Some(fut);
}
match unsafe { this.query_future.as_mut().unwrap_unchecked().poll_unpin(cx) } {
Poll::Ready(out_result) => match out_result {
Ok(out) => {
let mut page: Vec<Vec<String>> = out.into_page_of_rows();
this.query_future = None;
match page.pop() {
Some(row) => {
this.rows_processed += 1;
return Poll::Ready(Some(Ok(row)));
}
None => {
this.current_page = None;
}
}
}
Err(err) => return Poll::Ready(Some(Err(err))),
},
Poll::Pending => return Poll::Pending,
};
Poll::Ready(None)
}
}
Update: I've made this which...compiles? The error I'm getting when I compile mine complains about the inner type of Next not being Unpin, which I assumed was coming from the PhantomPinned, so now I'm double confused.
Further update: I have (re)discovered that issue actually came from actually iterating:
let query = Query::new(&mut conn, inp);
pin_mut!(query);
while let Some(result) = query.next().await {
println!("{:?}", result);
}
What I was missing before was pin_mut
on stream type, which I discovered in async-stream docs.
So I suppose my code "works," although I have fundamentally no idea what many of the components mean. As such, I will almost certainly opt for async-stream, but I would still like to understand whether the basic safety of the original code is sound, what in the world is going on with "pin_mut," etc.