Hi everyone,
Hope everyone has a great day!
I have the following code:
// tokio = { version = "1.34.0", features = ["full"] }
// futures = "0.3.30"
// tokio-stream = "0.1.15"
// pin-project-lite = "0.2"
// anyhow = "1.0.71"
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{future::BoxFuture, FutureExt, Sink};
use pin_project_lite::pin_project;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
pin_project! {
pub struct CustomSink<'a, T>
{
buffer: Vec<T>,
// Future<Output = Result<(), Error>>
#[pin]
future: Option<BoxFuture<'a, Result<(), anyhow::Error>>>,
}
}
impl<'a, T> CustomSink<'a, T> {
pub fn new() -> Self {
Self {
buffer: Vec::new(),
future: None,
}
}
pub async fn flush(buffer: Vec<T>) -> Result<(), anyhow::Error> {
if buffer.is_empty() {
return Ok(());
}
Ok(())
}
pub async fn flush_self(&self) -> Result<(), anyhow::Error> {
if self.buffer.is_empty() {
return Ok(());
}
Ok(())
}
}
impl<'a, T> Sink<T> for CustomSink<'a, T>
where
T: Send + Sync + 'a,
{
type Error = anyhow::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
todo!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = Pin::get_mut(self);
if this.buffer.len() == 0 {
return Poll::Ready(Ok(()));
}
// This compiles but forces me to pass in the params explicitly.
// If I want to mutate self inside of `flush` I would not be able to.
// let future = Self::flush(std::mem::take(&mut this.buffer)).boxed();
//
// Flush is suppose to flush the buffer to a database/filesystem/whatever and
// calls a bunch of async functions. In the process it might need to mutate self (for instance
// store an acquired connection, or update the stored connection if it is dead).
//
let future = this.flush_self().boxed();
this.future = Some(future);
todo!()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}
}
The issue is that the code does not compile:
Compiling rust_playground v0.1.0 (/Users/dario/code/rust_playground)
error: future cannot be sent between threads safely
--> src/main.rs:74:22
|
74 | let future = this.flush_self().boxed();
| ^^^^^^^^^^^^^^^^^ future returned by `flush_self` is not `Send`
|
= help: the trait `Sync` is not implemented for `dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send`, which is required by `impl futures::Future<Output = Result<(), anyhow::Error>>: std::marker::Send`
note: captured value is not `Send` because `&` references cannot be sent unless their referent is `Sync`
--> src/main.rs:40:29
|
40 | pub async fn flush_self(&self) -> Result<(), anyhow::Error> {
| ^^^^^ has type `&CustomSink<'_, T>` which is not `Send`, because `CustomSink<'_, T>` is not `Sync`
note: required by a bound in `futures::FutureExt::boxed`
--> /Users/dario/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/future/mod.rs:516:23
|
514 | fn boxed<'a>(self) -> BoxFuture<'a, Self::Output>
| ----- required by a bound in this associated function
515 | where
516 | Self: Sized + Send + 'a,
| ^^^^ required by this bound in `FutureExt::boxed`
error: could not compile `rust_playground` (bin "rust_playground") due to 1 previous error
I took a look at BoxFuture
and it seems the bound does not include Sync
. Does this mean CustomSink
cannot be Sync because BoxFutures
is not Sync
so this approach could never work?
My second question is: Am I on the right track with my approach/pattern or is there another way to solve this? The alternative approach would be to only pass in params inside the function and not use a self reference. This would force me to clone an Arc
which holds the database pool and pass that to the function every time.
Thanks in advance for any input/help! If I happened to have missed any crucial information please let me know as well!