NOTE: Timer is always set to 1 second. smol::Timer::after(Duration::from_secs(1))
use futures::pin_mut;
use futures::AsyncWrite;
use futures::AsyncWriteExt;
use futures::Future;
use futures_util::future::FutureExt;
use pin_project::pin_project;
use smol::{net::{TcpStream, TcpListener}, lock::{Mutex, futures::LockArc, MutexGuard, MutexGuardArc}};
#[pin_project]
pub struct SharedWriter<W> {
#[pin] writer: Arc<Mutex<W>>,
writer_lock: Option<MutexGuardArc<W>>,
f: smol::Timer,
}
impl<W: AsyncWrite + Unpin> Future for SharedWriter<W> {
type Output = std::io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
futures::ready!(this.f.poll_unpin(cx));
Poll::Ready(Ok(()))
}
}
then I use the above code like this
let mut f = futures::stream::FuturesUnordered::new();
f.push(shared_writer1);
f.push(shared_writer2);
f.push(shared_writer3);
while let Some(r) = f.next().await {
println!("{:?}", r.unwrap());
}
There are no problems and FuturesUnordered runs each shared_writer one after the other. If I change SharedWriter to just acquire the lock, without a timer involved, it also works just fine. FuturesUnordered executes each shared_writer one by one.
impl<W: AsyncWrite + Unpin> Future for SharedWriter<W> {
type Output = std::io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let guard = if this.writer_lock.is_none() {
let lock = this.writer.lock_arc();
pin_mut!(lock);
*this.writer_lock = Some(futures::ready!(lock.poll(cx)));
this.writer_lock.as_mut().unwrap()
} else {
this.writer_lock.as_mut().unwrap()
};
*this.writer_lock = None; // drop lock
Poll::Ready(Ok(()))
}
}
But when I add both a timer and acquire lock, FuturesUnordered stops after executing the first shared_writer and there is a deadlock.
impl<W: AsyncWrite + Unpin> Future for SharedWriter<W> {
type Output = std::io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let guard = if this.writer_lock.is_none() {
let lock = this.writer.lock_arc();
pin_mut!(lock);
*this.writer_lock = Some(futures::ready!(lock.poll(cx)));
this.writer_lock.as_mut().unwrap()
} else {
this.writer_lock.as_mut().unwrap()
};
futures::ready!(this.f.poll_unpin(cx)); // causes problem
*this.writer_lock = None; // drop lock
Poll::Ready(Ok(()))
}
}
I don't see how this is a problem. Acquire lock, poll the timer, Poll::Pending or Poll::Ready from timer, it doesn't matter. Eventually timer will return Poll::Ready and the lock will be released. Someone will make progress.
I involve FuturesUnordered because if I just execute each shared_writer future one by one without FuturesUnordered, it works just fine either impl and doesn't deadlock.
shared_writer1.await.unwrap();
shared_writer2.await.unwrap();
shared_writer3.await.unwrap();
No problems either implementation with the above calling code.