Concurrency & Threads

Can a tight loop in rust prevent the current thread from executing any other code? Is rust runtime preemptive or cooperative?

(preemptive at language construct level - like Erlang)

Not sure if I understand your quesiton, but Rust has support for Futures, which allow you to use a runtime such as provided by tokio, which allows scheduling tasks on a single or multiple CPUs where switching between tasks works cooperatively.


Rust also supports threads, so it can be preemptive as well. Both work.

Rust threading only uses OS threads, not green threads in a runtime. So if you start multiple std::thread, the operating system will schedule them as it sees fit, usually preemptive.

If you mean concurrency in async code, then I think you can roughly say that it's cooperative at .await points, but the specifics may still depend on your choice of async runtime.

10 Likes

If the current thread is executing a tight loop, that is what it is doing, it cannot be executing anything else. A thread has single thread of execution, by definition. It doesn't stop OTHER threads from executing though. Perhaps you mis-stated your question ( or perhaps I have misunderstood it - maybe you mean another thread is executing a tight loop?).

5 Likes

The Rust language doesn't have a runtime.

5 Likes

I may well do so. It all depends on the operating system.

Some operating systems will not preemptively switch execution from thread to thread. They may not reschedule threads unless a call is made into the kernel. So a while true {...} loop in one thread can totally halt any progress on any other thread.

Even in operating systems that support preemption a high priority thread may well block other lower priority threads as it loops.

Rust threads do not specify or give us control over any of this. As far as I know, correct me if I am wrong.

Per wikipedia: "Today, nearly all operating systems support preemptive multitasking, including the current versions of Windows, macOS, Linux (including Android) and iOS."

I highly doubt the original poster is running an operating system without preeemptive multi-tasking, such systems died out many decades ago.

1 Like

I guess you are right. The last time I used a cooperative scheduler it was running on top of MS-DOS. It was nice, no worries about atomics or messing with mutexes etc.

Now a days an endless loop will hang up an async run time and block other threads progress. But we tend to call those "tasks" rather than "threads".

Here a simple case of cooperative tasks using a small mini-runtime:

//#![feature(future_join)]
//use std::future::join; // this provides `join!` but requires Nightly Rust

// We use `join` from the `futures` crate instead to run on stable Rust
use futures::future::join;

async fn task1() {
    for i in 1..=3 {
        println!("task1, step{i}");
        my_little_runtime::yield_task().await;
    }
}

async fn task2() {
    for i in 1..=3 {
        println!("task2, step{i}");
        my_little_runtime::yield_task().await;
    }
}

fn main() {
    my_little_runtime::block_on(join(task1(), task2()));
}

mod my_little_runtime {
    use std::future::Future;
    use std::pin::Pin;
    use std::sync::Arc;
    use std::sync::atomic::{self, AtomicBool};
    use std::task::{Context, Poll, Wake, Waker};

    pub struct Yield { ready: bool }
    
    pub fn yield_task() -> Yield {
        Yield { ready: false }
    }
    
    impl Future for Yield {
        type Output = ();
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            if self.ready {
                Poll::Ready(())
            } else {
                self.ready = true;
                cx.waker().wake_by_ref();
                Poll::Pending
            }
        }
    }
    
    pub fn block_on<R, F: Future<Output = R>>(future: F) -> R {
        struct Wk { woken: AtomicBool }
        impl Wake for Wk {
            fn wake(self: Arc<Self>) {
                self.woken.store(true, atomic::Ordering::SeqCst) // which ordering here?
            }
            fn wake_by_ref(self: &Arc<Self>) {
                self.woken.store(true, atomic::Ordering::SeqCst) // which ordering here?
            }
        }
        let wk = Arc::new(Wk { woken: AtomicBool::new(false) });
        let waker = Waker::from(wk.clone());
        let mut context = Context::from_waker(&waker);
        let mut future = Box::pin(future);
        loop {
            let poll = future.as_mut().poll(&mut context);
            if let Poll::Ready(retval) = poll {
                return retval;
            }
            if !wk.woken.load(atomic::Ordering::SeqCst) { // which ordering here?
                panic!("Dornröschen");
            }
            wk.woken.store(false, atomic::Ordering::SeqCst); // which ordering here?
        }
    }
}

(Playground)

Output:

task1, step1
task2, step1
task1, step2
task2, step2
task1, step3
task2, step3


And here an example with (preemptive) threads:

use std::time::Duration;
use std::thread::sleep;

fn task1() {
    for i in 1..=3 {
        println!("task1, step{i}");
        sleep(Duration::from_millis(3));
    }
}

fn task2() {
    for i in 1..=3 {
        println!("task2, step{i}");
        sleep(Duration::from_millis(7));
    }
}

fn main() {
    let join1 = std::thread::spawn(task1);
    let join2 = std::thread::spawn(task2);
    join1.join().unwrap();
    join2.join().unwrap();
}

(Playground)

Output:

task2, step1
task1, step1
task1, step2
task1, step3
task2, step2
task2, step3

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.