Store future in Self referencing self

Hi everyone,

Hope everyone has a great day!

I have the following code:

// tokio = { version = "1.34.0", features = ["full"] }
// futures = "0.3.30"
// tokio-stream = "0.1.15"
// pin-project-lite = "0.2"
// anyhow = "1.0.71"
use std::{
    pin::Pin,
    task::{Context, Poll},
};

use futures::{future::BoxFuture, FutureExt, Sink};
use pin_project_lite::pin_project;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    Ok(())
}

pin_project! {
  pub struct CustomSink<'a, T>
  {
      buffer: Vec<T>,
      // Future<Output = Result<(), Error>>
      #[pin]
      future: Option<BoxFuture<'a, Result<(), anyhow::Error>>>,
  }
}

impl<'a, T> CustomSink<'a, T> {
    pub fn new() -> Self {
        Self {
            buffer: Vec::new(),
            future: None,
        }
    }

    pub async fn flush(buffer: Vec<T>) -> Result<(), anyhow::Error> {
        if buffer.is_empty() {
            return Ok(());
        }

        Ok(())
    }

    pub async fn flush_self(&self) -> Result<(), anyhow::Error> {
        if self.buffer.is_empty() {
            return Ok(());
        }

        Ok(())
    }
}

impl<'a, T> Sink<T> for CustomSink<'a, T>
where
    T: Send + Sync + 'a,
{
    type Error = anyhow::Error;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        todo!()
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        todo!()
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let this = Pin::get_mut(self);
        if this.buffer.len() == 0 {
            return Poll::Ready(Ok(()));
        }

        // This compiles but forces me to pass in the params explicitly.
        // If I want to mutate self inside of `flush` I would not be able to.
        // let future = Self::flush(std::mem::take(&mut this.buffer)).boxed();


        //
        // Flush is suppose to flush the buffer to a database/filesystem/whatever and 
        // calls a bunch of async functions. In the process it might need to mutate self (for instance 
        // store an acquired connection, or update the stored connection if it is dead).
        //
        let future = this.flush_self().boxed();

        this.future = Some(future);

        todo!()
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        todo!()
    }
}

The issue is that the code does not compile:

   Compiling rust_playground v0.1.0 (/Users/dario/code/rust_playground)
error: future cannot be sent between threads safely
   --> src/main.rs:74:22
    |
74  |         let future = this.flush_self().boxed();
    |                      ^^^^^^^^^^^^^^^^^ future returned by `flush_self` is not `Send`
    |
    = help: the trait `Sync` is not implemented for `dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send`, which is required by `impl futures::Future<Output = Result<(), anyhow::Error>>: std::marker::Send`
note: captured value is not `Send` because `&` references cannot be sent unless their referent is `Sync`
   --> src/main.rs:40:29
    |
40  |     pub async fn flush_self(&self) -> Result<(), anyhow::Error> {
    |                             ^^^^^ has type `&CustomSink<'_, T>` which is not `Send`, because `CustomSink<'_, T>` is not `Sync`
note: required by a bound in `futures::FutureExt::boxed`
   --> /Users/dario/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/future/mod.rs:516:23
    |
514 |     fn boxed<'a>(self) -> BoxFuture<'a, Self::Output>
    |        ----- required by a bound in this associated function
515 |     where
516 |         Self: Sized + Send + 'a,
    |                       ^^^^ required by this bound in `FutureExt::boxed`

error: could not compile `rust_playground` (bin "rust_playground") due to 1 previous error

I took a look at BoxFuture and it seems the bound does not include Sync. Does this mean CustomSink cannot be Sync because BoxFutures is not Sync so this approach could never work?

My second question is: Am I on the right track with my approach/pattern or is there another way to solve this? The alternative approach would be to only pass in params inside the function and not use a self reference. This would force me to clone an Arc which holds the database pool and pass that to the function every time.

Thanks in advance for any input/help! If I happened to have missed any crucial information please let me know as well!

You can take advantage of the fact that futures do not need to be shared using Exclusive (unstable) or sync_wrapper. These wrappers always implement Sync, safely, by denying the shared access you don't need.

Your approach will not work, because the Rust language does not support declaring self-referential structs. Pin provides the condition for such structs to be sound, but there is no syntax that will let you take advantage of Pin; your lifetime 'a can only refer to things that outlive the struct.

The only ways to have this type of self-reference are:

  • write it using unsafe code to bypass borrow checking,
  • use a library like yoke or ouroboros which does that and wraps it in a safe interface (this costs extra heap allocation), or
  • use the built-in self-reference capability of async blocks (which are not suitable for your goal).

In your situation, I would recommend avoiding async fns and implementing whatever logic you need manually inside poll_flush and poll_ready. That way you don't need to deal with opaque Futures that borrow from self. Or, if that is infeasible, do what you already thought of, “clone an Arc which holds the database pool” — use shared ownership rather than borrowing.

2 Likes

Awesome thank you for the quick reply!

I ended up just going with just cloning the arc of data pool connections and creating a future that way. Thank you!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.