How to write a efficient future which waits a value to become target value?

Problem

I want to write a async future, whose task is waiting some value to become some target value.

Below is my own implementation, which follows the guide: Task Wakeups with Waker - Asynchronous Programming in Rust (rust-lang.github.io) .

My question is this implementation have any space to improve? Do we already have highly-optimized implementation?

My Code

use std::{
    sync::{Arc, Mutex},
    task::{Poll, Waker},
};

use futures::Future;

#[cfg(test)]
mod tests {
    use std::{
        sync::{Arc, Mutex},
        thread,
        time::Duration,
    };

    use futures::executor::block_on;

    use crate::{SharedState, ValueEvent};

    #[test]
    fn it_works() {
        let state = Arc::new(Mutex::new(SharedState {
            value: 0,
            waker: None,
        }));
        let thread_shared_state = state.clone();
        let test_shared_state = state.clone();
        std::thread::spawn(move || loop {
            thread::sleep(Duration::from_millis(20));
            let mut state = thread_shared_state.lock().unwrap();
            state.value += 1;
            if let Some(waker) = state.waker.take() {
                waker.wake();
            }
            if state.value >= 200 {
                break;
            }
        });
        let value_event = ValueEvent::new(state, 50);
        block_on(value_event);
        println!("value_event occurred.");
        thread::sleep(Duration::from_secs(1));
        let state = test_shared_state.lock().unwrap();
        println!("end value:{}", state.value);
    }
}
struct SharedState<State> {
    value: State,
    waker: Option<Waker>,
}
struct ValueEvent<Value>
where
    Value: PartialEq,
{
    shared_state: Arc<Mutex<SharedState<Value>>>,
    target: Value,
}
impl<Value: PartialEq> ValueEvent<Value> {
    pub fn new(state: Arc<Mutex<SharedState<Value>>>, target: Value) -> Self {
        Self {
            shared_state: state,
            target,
        }
    }
}
impl<Value: PartialEq> Future for ValueEvent<Value> {
    type Output = ();

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.value == self.target {
            Poll::Ready(())
        } else {
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

The only improvement that I can see is to store the target value inside the SharedState so that you don't need to wake the waker when changing it to something different than the target value.

1 Like

Thanks for your suggestion!
I rewrite code like this:

use std::{
    sync::{Arc, Mutex},
    task::{Poll, Waker},
};

use futures::Future;

struct SharedValue<Value> {
    value: Value,
    target: Value,
    waker: Option<Waker>,
}
impl<Value: PartialEq> SharedValue<Value> {
    pub fn update(&mut self, new_value: Value) {
        self.value = new_value;
        if self.value == self.target && self.waker.is_some() {
            self.waker.take().unwrap().wake();
        }
    }
}
struct ValueEvent<Value>
where
    Value: PartialEq,
{
    inner: Arc<Mutex<SharedValue<Value>>>,
}
impl<Value: PartialEq> ValueEvent<Value> {
    pub fn new(inner: Arc<Mutex<SharedValue<Value>>>) -> Self {
        Self { inner }
    }
}
impl<Value: PartialEq> Future for ValueEvent<Value> {
    type Output = ();

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let mut inner = self.inner.lock().unwrap();
        if inner.value == inner.target {
            Poll::Ready(())
        } else {
            inner.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
#[cfg(test)]
mod tests {
    use futures::executor::block_on;

    use super::*;
    #[test]
    fn test_value_event() {
        let value = Arc::new(Mutex::new(SharedValue {
            value: 0,
            target: 1,
            waker: None,
        }));
        let cloned_value = value.clone();
        std::thread::spawn(move || {
            cloned_value.lock().unwrap().update(1);
        });
        let value_event = ValueEvent::new(value);
        block_on(value_event);
    }
}

I am curious that Do we have more elegant asynchronous way of being notified that some value has become the target value.

Well, you could use an existing tool like tokio::sync::Notify which would let you avoid custom poll functions.

I note that this:

if self.value == self.target && self.waker.is_some() {
    self.waker.take().unwrap().wake();
}

Can be written without an unwrap:

if self.value == self.target {
    if let Some(waker) = self.waker.take() {
        waker.wake();
    }
}
1 Like

Thank you a lot! :smiling_face:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.