Help debugging inf loop

I am trying to modify mini-tokio to only depend on std. Instead, I have somehow gotten this to infinite loop instead. Any insights on how to debug this?

The park code is from modifying: GitHub - spacejam/extreme: extremely boring async function runner!

Thanks.

use std::{
    cell::RefCell,
    collections::VecDeque,
    future::Future,
    pin::Pin,
    sync::{Arc, Condvar, Mutex},
    task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
    thread,
    time::{Duration, Instant},
};

// ========== Park

#[derive(Default)]
struct Park {
    mutex: Mutex<bool>,
    cond_var: Condvar,}

impl Park {
    pub fn unpark(&self) {
        *self.mutex.lock().unwrap() = true;
        self.cond_var.notify_one();}

    pub fn park(&self) {
        let mut runnable = self.mutex.lock().unwrap();
        while !*runnable {
            runnable = self.cond_var.wait(runnable).unwrap();}}}

// ========== Tasks ready to run

pub struct ReadyTasks {
    park: Park,
    tasks: Mutex<VecDeque<Arc<Task>>>,}

impl ReadyTasks {
    pub fn new() -> (ReadyTasks_Sender, ReadyTasks_Receiver) {
        let v = ReadyTasks {
            park: Park::default(),
            tasks: Mutex::new(VecDeque::new()),};
        let v = Arc::new(v);
        (
            ReadyTasks_Sender { tasks: v.clone()},
            ReadyTasks_Receiver { tasks: v.clone()},)}}

pub struct ReadyTasks_Receiver {
    tasks: Arc<ReadyTasks>,}

impl ReadyTasks_Receiver {
    pub fn recv(&self) -> Result<Arc<Task>, ()> {
        let mut v = {
            let mut t = self.tasks.tasks.lock().unwrap();
            t.pop_front()};
        while v.is_none() {
            self.tasks.park.park();
            v = {
                let mut t = self.tasks.tasks.lock().unwrap();
                t.pop_front()};}
        Ok(v.unwrap())}}

#[derive(Clone)]
pub struct ReadyTasks_Sender {
    tasks: Arc<ReadyTasks>,}

impl ReadyTasks_Sender {
    pub fn send(&self, task: Arc<Task>) {
        {
            let mut t = self.tasks.tasks.lock().unwrap();
            t.push_back(task);}
        self.tasks.park.unpark();}}

fn main() {
    let mini_tokio = MiniTokio::new();

    // Spawn the root task. All other tasks are spawned from the context of this
    // root task. No work happens until `mini_tokio.run()` is called.
    mini_tokio.spawn(async {
        // Spawn a task
        spawn(async {
            // Wait for a little bit of time so that "world" is printed after
            // "hello"
            delay(Duration::from_millis(100)).await;
            println!("world");});

        // Spawn a second task
        spawn(async {
            println!("hello");});

        // We haven't implemented executor shutdown, so force the process to exit.
        delay(Duration::from_millis(200)).await;
        std::process::exit(0);});

    // Start the mini-tokio executor loop. Scheduled tasks are received and
    // executed.
    mini_tokio.run();}

/// A very basic futures executor based on a channel. When tasks are woken, they
/// are scheduled by queuing them in the send half of the channel. The executor
/// waits on the receive half and executes received tasks.
///
/// When a task is executed, the send half of the channel is passed along via
/// the task's Waker.

struct MiniTokio {
    // Receives scheduled tasks. When a task is scheduled, the associated future
    // is ready to make progress. This usually happens when a resource the task
    // uses becomes ready to perform an operation. For example, a socket
    // received data and a `read` call will succeed.
    scheduled: ReadyTasks_Receiver,

    // Send half of the scheduled channel.
    sender: ReadyTasks_Sender,}

impl MiniTokio {
    /// Initialize a new mini-tokio instance.
    fn new() -> MiniTokio {
        let (sender, scheduled) = ReadyTasks::new();

        MiniTokio { scheduled, sender}}

    /// Spawn a future onto the mini-tokio instance.
    ///
    /// The given future is wrapped with the `Task` harness and pushed into the
    /// `scheduled` queue. The future will be executed when `run` is called.
    fn spawn<F>(&self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,{
        Task::spawn(future, &self.sender);}

    /// Run the executor.
    ///
    /// This starts the executor loop and runs it indefinitely. No shutdown
    /// mechanism has been implemented.
    ///
    /// Tasks are popped from the `scheduled` channel receiver. Receiving a task
    /// on the channel signifies the task is ready to be executed. This happens
    /// when the task is first created and when its waker has been used.
    fn run(&self) {
        // Set the CURRENT thread-local to point to the current executor.
        //
        // Tokio uses a thread-local variable to implement `tokio::spawn`. When
        // entering the runtime, the executor stores necessary context with the
        // thread-local to support spawning new tasks.
        CURRENT.with(|cell| {
            *cell.borrow_mut() = Some(self.sender.clone());});

        // The executor loop. Scheduled tasks are received. If the channel is
        // empty, the thread blocks until a task is received.
        while let Ok(task) = self.scheduled.recv() {
            // Execute the task until it either completes or cannot make further
            // progress and returns `Poll::Pending`.
            task.poll();}}}

// An equivalent to `tokio::spawn`. When entering the mini-tokio executor, the
// `CURRENT` thread-local is set to point to that executor's channel's Send
// half. Then, spawning requires creating the `Task` harness for the given
// `future` and pushing it into the scheduled queue.
pub fn spawn<F>(future: F)
where
    F: Future<Output = ()> + Send + 'static,{
    CURRENT.with(|cell| {
        let borrow = cell.borrow();
        let sender = borrow.as_ref().unwrap();
        Task::spawn(future, sender);});}

// Asynchronous equivalent to `thread::sleep`. Awaiting on this function pauses
// for the given duration.
//
// mini-tokio implements delays by spawning a timer thread that sleeps for the
// requested duration and notifies the caller once the delay completes. A thread
// is spawned **per** call to `delay`. This is obviously a terrible
// implementation strategy and nobody should use this in production. Tokio does
// not use this strategy. However, it can be implemented with few lines of code,
// so here we are.
async fn delay(dur: Duration) {
    // `delay` is a leaf future. Sometimes, this is refered to as a "resource".
    // Other resources include sockets and channels. Resources may not be
    // implemented in terms of `async/await` as they must integrate with some
    // operating system detail. Because of this, we must manually implement the
    // `Future`.
    //
    // However, it is nice to expose the API as an `async fn`. A useful idiom is
    // to manually define a private future and then use it from a public `async
    // fn` API.
    struct Delay {
        // When to complete the delay.
        when: Instant,
        // The waker to notify once the delay has completed. The waker must be
        // accessible by both the timer thread and the future so it is wrapped
        // with `Arc<Mutex<_>>`
        waker: Option<Arc<Mutex<Waker>>>,}

    impl Future for Delay {
        type Output = ();

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
            // First, if this is the first time the future is called, spawn the
            // timer thread. If the timer thread is already running, ensure the
            // stored `Waker` matches the current task's waker.
            if let Some(waker) = &self.waker {
                let mut waker = waker.lock().unwrap();

                // Check if the stored waker matches the current task's waker.
                // This is necessary as the `Delay` future instance may move to
                // a differnt task between calls to `poll`. If this happens, the
                // waker contained by the given `Context` will differ and we
                // must update our stored waker to reflect this change.
                if !waker.will_wake(cx.waker()) {
                    *waker = cx.waker().clone();}}
            else {
                let when = self.when;
                let waker = Arc::new(Mutex::new(cx.waker().clone()));
                self.waker = Some(waker.clone());

                // This is the first time `poll` is called, spawn the timer thread.
                thread::spawn(move || {
                    let now = Instant::now();

                    if now < when {
                        thread::sleep(when - now);}

                    // The duration has elapsed. Notify the caller by invoking
                    // the waker.
                    let waker = waker.lock().unwrap();
                    waker.wake_by_ref();});}

            // Once the waker is stored and the timer thread is started, it is
            // time to check if the delay has completed. This is done by
            // checking the current instant. If the duration has elapsed, then
            // the future has completed and `Poll::Ready` is returned.
            if Instant::now() >= self.when {
                Poll::Ready(())}
            else {
                // The duration has not elapsed, the future has not completed so
                // return `Poll::Pending`.
                //
                // The `Future` trait contract requires that when `Pending` is
                // returned, the future ensures that the given waker is signaled
                // once the future should be polled again. In our case, by
                // returning `Pending` here, we are promising that we will
                // invoke the given waker included in the `Context` argument
                // once the requested duration has elapsed. We ensure this by
                // spawning the timer thread above.
                //
                // If we forget to invoke the waker, the task will hang
                // indefinitely.
                Poll::Pending}}}

    // Create an instance of our `Delay` future.
    let future = Delay {
        when: Instant::now() + dur,
        waker: None,};

    // Wait for the duration to complete.
    future.await;}

// Used to track the current mini-tokio instance so that the `spawn` function is
// able to schedule spawned tasks.
thread_local! {
    static CURRENT: RefCell<Option<ReadyTasks_Sender >> =
        RefCell::new(None);
}

// Task harness. Contains the future as well as the necessary data to schedule
// the future once it is woken.
pub struct Task {
    // The future is wrapped with a `Mutex` to make the `Task` structure `Sync`.
    // There will only ever be a single thread that attempts to use `future`.
    // The Tokio runtime avoids the mutex by using `unsafe` code. The box is
    // also avoided.
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,

    // When a task is notified, it is queued into this channel. The executor
    // pops notified tasks and executes them.
    executor: ReadyTasks_Sender,}

impl Task {
    // Spawns a new task with the given future.
    //
    // Initializes a new Task harness containing the given future and pushes it
    // onto `sender`. The receiver half of the channel will get the task and
    // execute it.
    fn spawn<F>(future: F, sender: &ReadyTasks_Sender)
    where
        F: Future<Output = ()> + Send + 'static,{
        let task = Arc::new(Task {
            future: Mutex::new(Box::pin(future)),
            executor: sender.clone(),});

        let _ = sender.send(task);}

    // Execute a scheduled task. This creates the necessary `task::Context`
    // containing a waker for the task. This waker pushes the task onto the
    // mini-tokio scheduled channel. The future is then polled with the waker.
    fn poll(self: Arc<Self>) {
        // Get a waker referencing the task.
        // let waker = todo!(); // task::waker(self.clone());
        // Initialize the task context with the waker.

        // self.future;

        // <F: std::future::Future>(mut f: F)

        // let mut f = unsafe { std::pin::Pin::new_unchecked(&mut f) };
        let park = Arc::new(Park::default());
        let sender = Arc::into_raw(park.clone());

        let raw_waker = RawWaker::new(sender as *const _, &VTABLE);
        let waker = unsafe { Waker::from_raw(raw_waker) };

        let mut cx = Context::from_waker(&waker);

        // This will never block as only a single thread ever locks the future.
        let mut future = self.future.try_lock().unwrap();

        // Poll the future
        let _ = future.as_mut().poll(&mut cx);}}

#[test]
fn test_00() {
    main();}

static VTABLE: RawWakerVTable = RawWakerVTable::new(
    |clone_me| unsafe {
        let arc = Arc::from_raw(clone_me as *const Park);
        std::mem::forget(arc.clone());
        RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)},
    |wake_me| unsafe { Arc::from_raw(wake_me as *const Park).unpark() },
    |wake_by_ref_me| unsafe { (&*(wake_by_ref_me as *const Park)).unpark() },
    |drop_me| unsafe { drop(Arc::from_raw(drop_me as *const Park)) },);

Once you have unparked once, the boolean remains true forever so park doesn't work properly. Generally you would fix this by setting it back to false when you return.

However, that is not the bug. The bug is that your waker doesn't actually send the task to the runtime, so even though you unparked the receiver, it will just find that the VecDeque is still empty and go around the loop again.

Also, I wish you would rustfmt your code before you post it.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.