Thread safe `tokio_util::time::delay_queue`?

I'm looking for a tokio::time::delay_queue that's thread safe. Basically, I want one tasks to be able to modify timers in the queue while another listen for timeouts. Is this possible?

Use case is something like this:

I have a ranking algorithm that's constantly adjusting scores based on upvotes and time decay. An upvote can "bump" a score up but as time elapsed increase, the score decays. When the decay lowers passes a certain threshold, I need to wake up and do some operation.

A delay queue has all the buttons I need to achieve this but I don't think it's thread safe. Namely, I need to be able to adjust the timers inside this queue from another thread because a "bump" will result in the timeout being extended

1 Like

You could wrap it in an actor. One input queue to receive the bumps, an output to send the expired events.

1 Like

Don't think that'll work because consuming from the DelayQueue takes ownership of the queue:

fn run(mut self) {
        tokio::spawn(async move {
            while let Some(value) = self.delay_queue.next().await {
                println!("Timer value: {:?}", value);
            }
        });
    }

If I understand the following,

it means to create a tokio actor task that has two channels. This task owns the DelayQueue, so it has no concurrency problems when accessing the DelayQueue.

The input channel would be an mpsc channel. You would define a bump message that is sent over the input channel by other tasks, and when it is received by the actor task, the actor task can then modify the timers in the queue.

Note: The DelayQueue is not in the tokio package, it is in tokio_util. Could you please edit the title and post so that others aren't confused?

So IIUC something like this?

pub fn run(&mut self) {
    let mut delay_queue = DelayQueue::new();
    let (tx, mut rx) = mpsc::channel(32);
    self.sender = Some(tx);

    tokio::spawn(async move {
        loop {
            select! {
                Some(key) = rx.recv() => {
                    delay_queue.insert(key.clone(), std::time::Duration::from_secs(5));
                    println!("Received bump: {}", key);
                }
                // Handle delayed messages
                Some(delayed) = delay_queue.next() => {
                    let key = delayed.into_inner();
                    println!("Key timed out: {}", key);
                }
            }
        }
    });
}

Sry if this is obvious, I'm still a noob at Rust... but is there another way that would allow the consumption of the delay_queue and the command (aka "bump") to happen on two different tasks?

Done

Yes, that looks like the right idea! I haven't used these libraries myself, but that's what I understood they were saying.

Sorry, I'm not familiar enough with tokio and DelayQueue to say. @the8472 recommended the actor approach, presumably because you can't access it from multiple tasks, but I don't want to presume too much.

but is there another way that would allow the consumption of the delay_queue and the command (aka "bump") to happen on two different tasks?

Well you can share the queue across tasks by wrapping it in an Arc<Mutext<_>>. The issue is that you need &mut to await it and also to modify it. So bumping and waiting would stall each other, which I assume is not desirable. You'd need some sort of interruptible lock-and-await mechanism to make that work.
So putting the delayqueue into a single task and then using other mechanisms (such as channels) to coordinate access should work better.

1 Like

Ya I thought so too. Thanks for confirming

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.