Tokio Timeout not Timeouting

I have a (deterministic) function, which on some inputs may take very long to return. If the original function f : X -> Y, I intend to have an expression g : X -> Option<Y> such that g(i) = Some(f(i)) if the computation for f(i) didn't timeout, and None otherwise.

fn silly(x : usize) -> usize {
    // choose a random boolean
    let mut rng = thread_rng();
    let b = rng.gen_bool(0.5);
    if b {
        x + 1
    } else {
       loop {}
    }
}

async fn timeout_silly(duration : Duration, x : usize) -> Option<usize> {
    match timeout(duration, lazy(|_| {silly(x)})).await {
        Ok(x) => Some(x),
        Err(_) => None
    }
}

#[tokio::main]
async fn main(){
    match timeout_silly(Duration::from_secs(1), 5).await {
        Some(x) => println!("found {}", x),
        None => println!("timeout")
    }
}

I have tried this, but it does not seem to actually timeout, but rather go into the loop. What am I doing wrong?

Also, I do not necessarily care about tokio. Anything that gets the job done is fine with me, and the simpler, the better

Timeouts can only happen when execution reaches an .await, and nowhere else. Code like loop {} cannot be interrupted by anything.

You can insert dummy await points:

but ideally you should not run CPU-bound code in async functions.

1 Like

Oh, so timeout is not for what I wanted to do with it.

So, how do I achieve what I want? silly is computed as a normal Rust function, and I do not intend to change the implementation of silly.

How can I call silly(i) with a timeout? The result should either be Some(silly(i)) if the computation is short, and None otherwise.

I don't necessarily need to use tokio. Just something simple that works with minimal boilerplate

Unfortunately, there's no nice way to force CPU-bound code to abort.

On some platforms it's possible to forcefully abort a thread, but that is highly problematic — it will leak memory, and may leave locks locked. It make the rest of your program unusable if you're unlucky and it was holding a lock for the memory allocator.

So the only reliable way is to put the function in a separate executable, spawn it as a completely new process (Command::spawn), and then kill that process.

It would be much easier if you could modify the silly code and make it abort cooperatively. It could check an AtomicBool once in a while to see if it should continue running.

1 Like

Oh I see :frowning:

Does this mean I have to make silly async silly?

How would I modify the silly in this case?

Any chance I can spawn the function as a process instead of a thread so that I can get the operating system to do the cleanup?

You don't have to make it async, but you have to modify it somehow to gracefully end when asked to, e.g. insert this all over the place:

if should_stop.load(SeqCst) {
    return; // or panic!()
}

where should_stop is &AtomicBool or Arc<AtomicBool>.

For process, there's no out-of-the-box way. The easy way to run a function in another process is fork(), but forking has lots of caveats about threads, open files, locks, etc. If you can't change the function, you'll just need to make another binary with it manually, the hard way.

1 Like

Like @kornel already said, you should not run CPU bound code as a normal Tokio task. If you want to parallelize execution, you might consider this from the docs:

CPU-bound tasks and blocking code

[…]

If your code is CPU-bound and you wish to limit the number of threads used to run it, you should use a separate thread pool dedicated to CPU bound tasks. For example, you could consider using the rayon library for CPU-bound tasks. It is also possible to create an extra Tokio runtime dedicated to CPU-bound tasks, but if you do this, you should be careful that the extra runtime runs only CPU-bound tasks, as IO-bound tasks on that runtime will behave poorly.

Hint: If using rayon, you can use a oneshot channel to send the result back to Tokio when the rayon task finishes.

Alternatively, if you want the CPU-bound task to be limited to a single CPU, you can also use std::thread::spawn or std::thread::scope.

I agree on that, but I do think that std::sync::atomic::Ordering::Relaxed should be used instead of SeqCst. See also this discussion from another thread:

Alternatively, use a synchronization primitive to send a message from the controlling thread to the CPU-bound thread. In some scenarios, you might want to make sure that dropping the sender (e.g. because the controlling thread panics) triggers the CPU-bound thread to stop. Most synchronization primitives allow checking if a channel has been closed (e.g. because the sender has been dropped).


P.S.: See also this thread for some (possibly non-idiomatic) ideas how to message a thread to terminate.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.