Using Waker and Mutex to communicate between async threads

This might be a stupid question but I'm debugging a pretty complex async code and need to be sure.

Imagine you are writing HTTP2 where you have a Connection and multiple Streams. Connection and Streams are communicating over Arc<Mutex>. Consider the following code snippets:

// Connection
impl futures::Stream for Connection {
    type Item = usize;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let this = Pin::into_inner(self);
        let mut store = this.stage.lock().unwrap(); // store is Arc<std::sync::Mutex<Store>>
        store.waker = Some(cx.waker().clone()); 
        ...
        Poll::Pending
    }
}
// Stream
task::spawn(async move {
    ...
    let mut store = this.stage.lock().unwrap();
    if let Some(w) = store.waker.as_ref() {
        w.clone().wake();
    }
    ...
});

Does the cx instance always point to the same "worker" or better does such context always represent the same context. If I save it once to a variable within Arc<Mutex<...>> to be used by other "threads" can these threads just clone it and wake as many times as they want? Also, std::sync::Mutex can't cause any problems in such cases, right?

Every time that poll_next is called, you should update the Waker because it may have changed. Using the std mutex like this is fine.

1 Like

Do you think I should update/set the Waker instance in futures::Sink functions as well?

impl futures::Sink<usize> for Stream {
    type Error = Error;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        ...
        store.waker = Some(cx.waker().clone()); 
        ...
    }

    fn start_send(mut self: Pin<&mut Self>, item: usize) -> Result<(), Self::Error> {
        ...
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        ...
        store.waker = Some(cx.waker().clone()); 
        ...
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        ...
        store.waker = Some(cx.waker().clone()); 
        ...
    }
}

Well, the sink should probably have its own waker. Otherwise you cannot read and write at the same time.

Hum ... Usually you'll use it like this:

stream.send(100).await.unwrap(); // sink cx
while let Some(event) = stream.next().await { // stream cx
    stream.send(200).await.unwrap(); // sink cx
}

How would you use it "at the same time"? You are saying that cx objects for futures::Stream and futures::Sink are different? Context doesn't represent simply a "worker"?

Well, for example, the futures crate provides a utility that will let you split a Stream + Sink into two halves. Those two halves can be moved to different tasks.

And no, the context doesn't really represent a worker. Even if two tasks run on the same Tokio worker thread, they will still have different contexts. On the other hand, two things in the same Tokio task can have different contexts if you use something like FuturesUnordered which will create its own context.

Huh, I see. That's new. Thanks a lot. Maybe I revisit this thread later.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.