Condvar.wait_timeout() Delays tokio::mpsc::channel?

I tried to send messages every 5 secs between two tokio threads through tokio::mpsc::channel with Condvar as a scheduler .

The message's arriving get significantly delay with CondVar.wait_timeout(),but if I put "tokio::time::sleep(n)" after tx.send() the delay get suppressed.

How come Condvar affect to tokio's scheduler like this? and is there any better way to avoid the problem?

rustc: 1.53.0 (53cb7b09b 2021-06-17)
tokio: 1.7.1

use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use tokio::{sync::mpsc::channel, time::sleep};

#[tokio::main]
async fn main() {
    let schedule_cond = Arc::new((Mutex::new(true), Condvar::new()));
    let (tx, mut rx) = channel::<std::time::Instant>(10);
    tokio::spawn(async move {
        loop {
            tx.send(std::time::Instant::now()).await.unwrap();
            println!("message has sent");

            // --- The Workd around ---
            // sleep(Duration::from_secs(1)).await;

            let (resume_checking, checking_wait_condvar) = &*schedule_cond;
            let waiting_resume_checking_value = resume_checking.lock().unwrap();
            let (exit, _) = checking_wait_condvar
                .wait_timeout(waiting_resume_checking_value, Duration::from_secs(5))
                .unwrap();
            if !*exit {
                break;
            }
        }
    });

    let jh = tokio::spawn(async move {
        while let Some(time) = rx.recv().await {
            println!("message received payload:{:?}", time);
        }
    });
    jh.await.unwrap();
}
# output
message has sent
message received payload:Instant { tv_sec: 30229, tv_nsec: 563806733 }
message has sent
message has sent
message has sent
message has sent
message has sent
message has sent
message has sent
message has sent
message has sent
message has sent
message received payload:Instant { tv_sec: 30234, tv_nsec: 563942332 }
message received payload:Instant { tv_sec: 30239, tv_nsec: 564141429 }
message received payload:Instant { tv_sec: 30244, tv_nsec: 564320206 }
message received payload:Instant { tv_sec: 30249, tv_nsec: 564507173 }
message received payload:Instant { tv_sec: 30254, tv_nsec: 564690380 }
message received payload:Instant { tv_sec: 30259, tv_nsec: 564873635 }
message received payload:Instant { tv_sec: 30264, tv_nsec: 565091873 }
message received payload:Instant { tv_sec: 30269, tv_nsec: 565208349 }
message received payload:Instant { tv_sec: 30274, tv_nsec: 565357607 }
message received payload:Instant { tv_sec: 30279, tv_nsec: 565574183 }
message has sent
message has sent
message has sent

...

This is explained in this article: Async: What is blocking?

thanks for the information! I will try to read it.

You're using std::sync::Condvar which is a primitive the tokio runtime is unaware of. Therefore, it'll block the entire runtime, which means that the tokio thread you spawn to receive the notifications will not get a turn to run until the timeout has occurred. The next chance it gets is once the thread reaches an await, and even then it's not guaranteed.

I was curious about this. Normally, cooperative user-level threading packages provide their own versions of locks, condition variables, and so on, see Python's asyncio as an example. tokio appears to provide a Mutex, and a simple Notify facility, but the developers seem hesitant to add support for condition variables. The notify facility doesn't support timeouts (or multiple waiters).

For your use case, I assume you need the ability to exit the thread sending the periodic notifications when the application shuts down, and you'd be using the associated condition for that (though this is not shown in your code).

Unless there's an alternate facility that provides both the ability to communicate a value and a timeout, this could be a compelling use case for adding condition variables in tokio. An alternate facility would be something like a channel that has a timeout associated with the recv operation (which doesn't appear to be the case for tokio::mpsc::channel from what I can see at first glance.)

1 Like

It doesn't have a dedicated method for timeouts because any asynchronous operation can be given a timeout using tokio::time::timeout. The same applies to mpsc channels.

2 Likes

In that case, the original poster could probably use a channel instead of a condition variable for their purpose: playground

use std::time::Duration;
use tokio::{sync::mpsc::channel, time::timeout, time::sleep};

#[tokio::main]
async fn main() {
    let (tx, mut rx) = channel::<std::time::Instant>(10);
    let (tx2, mut rx2) = channel::<std::time::Instant>(10);
    tokio::spawn(async move {
        loop {
            tx.send(std::time::Instant::now()).await.unwrap();
            println!("message sent");

            let res = timeout(Duration::from_millis(250), rx2.recv()).await;
            if !res.is_err() {
                break;
            }
        }
    });

    tokio::spawn(async move {
        while let Some(time) = rx.recv().await {
            println!("message received payload:{:?}", time);
        }
    });

    let shutdown = tokio::spawn(async move {
        sleep(Duration::from_secs(2)).await;
        tx2.send(std::time::Instant::now()).await.unwrap();
    });
    shutdown.await.unwrap();
}

Thanks for your suggestion. Yes, I want the message senidng thread to exit by explict way as you mensioned.

It's a good solution to use tokio::time::timeout, and it makes the code more simple than Condvar IMO.
However there might be strong requirements for the tokio version Condvar ...
https://github.com/tokio-rs/tokio/issues/3892

Theoretically speaking, you use condition variables when you need to combine state changes and notifications in an atomic way. That's why condition variables are related to mutexes. The mutex is acquired, the state is examined, and if there's a need to wait for a notification, you call the wait function which ensures that the mutex isn't released until it is certain that the caller will receive any future notifications issued by a thread that would acquire the same mutex, change state, and issue the notification. Recall that a notification on a condition variable is a no-op when there are no threads waiting for it.

Without condition variables and their ability to combine mutual exclusion and waiting for notifications, you would need a notification facility that remembers signals in order to avoid the lost wakeup problem that would otherwise arise. Based on these comments by carllerche tokio's notify_one() appears to store pending notifications when there are no waiters. This makes them more akin to traditional semaphores.

A potential problem here lies with workloads that notify frequently but rarely have any waiters. For instance, a highly concurrent server processing short requests approaching full utilization. In this case, traditional condition variables tend to perform better than semaphores because they do not require exclusive ownership in the cache coherency protocol. (The notify mechanism basically checks if there are any waiters and it becomes a no-op if there aren't any.) But I know too little of tokio at this point to judge whether this problem applies to tokio and its intended uses. Independent of that, I find the use case of porting multi-threaded apps to async code potentially compelling.

The Notify utility will store a permit when notify_one is called without anyone waiting for a notification. It will store at most one permit, so calling notify_one when there is already a permit is a no-op.

Hmmm, then I don't immediately understand how the Notify facility would replace condition variables without incurring additional overhead. In traditional use, the potential waiter would not call wait() if it can proceed based on the state it examines; but with Notify, there's now a notification that must be consumed (?).

I would love to see an example of how you would implement some of the traditional applications of condition variables (such as a producer/consumer queue, or a single-writer/multiple-reader lock) with tokio's facilities efficiently and correctly.

Here's a producer/consumer queue: playground

I didn't test it

About this part:

            let on_send_event = self.shared.on_send.notified();
            
            match self.try_recv() {
                Some(value) => return value,
                None => on_send_event.await,
            }

Is the notification (if already present) consumed on the call to .notified(), or is it consumed as part of the call to .await? How expensive is the call to .notified()?

It seems to me that this solution, when compared to the traditional

while (q.empty())
   cond_wait()
....

will call into .notified() every time as opposed to only when the queue is empty.

[ Also, side question, it is apparently ok to not await the future returned from .notified() at all, presumably because it provides a suitable Drop trait implementation? ]

Hm, I have come to realize that creating the call to notified early doesn't actually let you consume events emitted by notify_one, only those emitted by notify_waiters, so the playground I gave is not correct.

To answer your questions, calling .notified() currently has the cost of a single atomic load, and has an appropriate destructor.

BTW, one other argument in favor of traditional primitives is that they are relatively well understood so they tend to result in correct code when used idiomatically even by non-experts.

That said, C-style condition variables can be hard to use idiomatically and are often misunderstood; but API designs that derive the condition variable from the monitor's mutex help here, e.g. j.u.c.'s newCondition. I am curious why Rust's std::sync::CondVar chose the brittle C-style API instead and allowed construction of condition variables separately from an associated mutex.

1 Like

Taken together, the cost is certainly higher than the cost of not doing anything as with traditional condition variables.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.