I am trying to modify mini-tokio to only depend on std. Instead, I have somehow gotten this to infinite loop instead. Any insights on how to debug this?
The park code is from modifying: GitHub - spacejam/extreme: extremely boring async function runner!
Thanks.
use std::{
cell::RefCell,
collections::VecDeque,
future::Future,
pin::Pin,
sync::{Arc, Condvar, Mutex},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
thread,
time::{Duration, Instant},
};
// ========== Park
#[derive(Default)]
struct Park {
mutex: Mutex<bool>,
cond_var: Condvar,}
impl Park {
pub fn unpark(&self) {
*self.mutex.lock().unwrap() = true;
self.cond_var.notify_one();}
pub fn park(&self) {
let mut runnable = self.mutex.lock().unwrap();
while !*runnable {
runnable = self.cond_var.wait(runnable).unwrap();}}}
// ========== Tasks ready to run
pub struct ReadyTasks {
park: Park,
tasks: Mutex<VecDeque<Arc<Task>>>,}
impl ReadyTasks {
pub fn new() -> (ReadyTasks_Sender, ReadyTasks_Receiver) {
let v = ReadyTasks {
park: Park::default(),
tasks: Mutex::new(VecDeque::new()),};
let v = Arc::new(v);
(
ReadyTasks_Sender { tasks: v.clone()},
ReadyTasks_Receiver { tasks: v.clone()},)}}
pub struct ReadyTasks_Receiver {
tasks: Arc<ReadyTasks>,}
impl ReadyTasks_Receiver {
pub fn recv(&self) -> Result<Arc<Task>, ()> {
let mut v = {
let mut t = self.tasks.tasks.lock().unwrap();
t.pop_front()};
while v.is_none() {
self.tasks.park.park();
v = {
let mut t = self.tasks.tasks.lock().unwrap();
t.pop_front()};}
Ok(v.unwrap())}}
#[derive(Clone)]
pub struct ReadyTasks_Sender {
tasks: Arc<ReadyTasks>,}
impl ReadyTasks_Sender {
pub fn send(&self, task: Arc<Task>) {
{
let mut t = self.tasks.tasks.lock().unwrap();
t.push_back(task);}
self.tasks.park.unpark();}}
fn main() {
let mini_tokio = MiniTokio::new();
// Spawn the root task. All other tasks are spawned from the context of this
// root task. No work happens until `mini_tokio.run()` is called.
mini_tokio.spawn(async {
// Spawn a task
spawn(async {
// Wait for a little bit of time so that "world" is printed after
// "hello"
delay(Duration::from_millis(100)).await;
println!("world");});
// Spawn a second task
spawn(async {
println!("hello");});
// We haven't implemented executor shutdown, so force the process to exit.
delay(Duration::from_millis(200)).await;
std::process::exit(0);});
// Start the mini-tokio executor loop. Scheduled tasks are received and
// executed.
mini_tokio.run();}
/// A very basic futures executor based on a channel. When tasks are woken, they
/// are scheduled by queuing them in the send half of the channel. The executor
/// waits on the receive half and executes received tasks.
///
/// When a task is executed, the send half of the channel is passed along via
/// the task's Waker.
struct MiniTokio {
// Receives scheduled tasks. When a task is scheduled, the associated future
// is ready to make progress. This usually happens when a resource the task
// uses becomes ready to perform an operation. For example, a socket
// received data and a `read` call will succeed.
scheduled: ReadyTasks_Receiver,
// Send half of the scheduled channel.
sender: ReadyTasks_Sender,}
impl MiniTokio {
/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = ReadyTasks::new();
MiniTokio { scheduled, sender}}
/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,{
Task::spawn(future, &self.sender);}
/// Run the executor.
///
/// This starts the executor loop and runs it indefinitely. No shutdown
/// mechanism has been implemented.
///
/// Tasks are popped from the `scheduled` channel receiver. Receiving a task
/// on the channel signifies the task is ready to be executed. This happens
/// when the task is first created and when its waker has been used.
fn run(&self) {
// Set the CURRENT thread-local to point to the current executor.
//
// Tokio uses a thread-local variable to implement `tokio::spawn`. When
// entering the runtime, the executor stores necessary context with the
// thread-local to support spawning new tasks.
CURRENT.with(|cell| {
*cell.borrow_mut() = Some(self.sender.clone());});
// The executor loop. Scheduled tasks are received. If the channel is
// empty, the thread blocks until a task is received.
while let Ok(task) = self.scheduled.recv() {
// Execute the task until it either completes or cannot make further
// progress and returns `Poll::Pending`.
task.poll();}}}
// An equivalent to `tokio::spawn`. When entering the mini-tokio executor, the
// `CURRENT` thread-local is set to point to that executor's channel's Send
// half. Then, spawning requires creating the `Task` harness for the given
// `future` and pushing it into the scheduled queue.
pub fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,{
CURRENT.with(|cell| {
let borrow = cell.borrow();
let sender = borrow.as_ref().unwrap();
Task::spawn(future, sender);});}
// Asynchronous equivalent to `thread::sleep`. Awaiting on this function pauses
// for the given duration.
//
// mini-tokio implements delays by spawning a timer thread that sleeps for the
// requested duration and notifies the caller once the delay completes. A thread
// is spawned **per** call to `delay`. This is obviously a terrible
// implementation strategy and nobody should use this in production. Tokio does
// not use this strategy. However, it can be implemented with few lines of code,
// so here we are.
async fn delay(dur: Duration) {
// `delay` is a leaf future. Sometimes, this is refered to as a "resource".
// Other resources include sockets and channels. Resources may not be
// implemented in terms of `async/await` as they must integrate with some
// operating system detail. Because of this, we must manually implement the
// `Future`.
//
// However, it is nice to expose the API as an `async fn`. A useful idiom is
// to manually define a private future and then use it from a public `async
// fn` API.
struct Delay {
// When to complete the delay.
when: Instant,
// The waker to notify once the delay has completed. The waker must be
// accessible by both the timer thread and the future so it is wrapped
// with `Arc<Mutex<_>>`
waker: Option<Arc<Mutex<Waker>>>,}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// First, if this is the first time the future is called, spawn the
// timer thread. If the timer thread is already running, ensure the
// stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();
// Check if the stored waker matches the current task's waker.
// This is necessary as the `Delay` future instance may move to
// a differnt task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();}}
else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());
// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);}
// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();});}
// Once the waker is stored and the timer thread is started, it is
// time to check if the delay has completed. This is done by
// checking the current instant. If the duration has elapsed, then
// the future has completed and `Poll::Ready` is returned.
if Instant::now() >= self.when {
Poll::Ready(())}
else {
// The duration has not elapsed, the future has not completed so
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signaled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending}}}
// Create an instance of our `Delay` future.
let future = Delay {
when: Instant::now() + dur,
waker: None,};
// Wait for the duration to complete.
future.await;}
// Used to track the current mini-tokio instance so that the `spawn` function is
// able to schedule spawned tasks.
thread_local! {
static CURRENT: RefCell<Option<ReadyTasks_Sender >> =
RefCell::new(None);
}
// Task harness. Contains the future as well as the necessary data to schedule
// the future once it is woken.
pub struct Task {
// The future is wrapped with a `Mutex` to make the `Task` structure `Sync`.
// There will only ever be a single thread that attempts to use `future`.
// The Tokio runtime avoids the mutex by using `unsafe` code. The box is
// also avoided.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
// When a task is notified, it is queued into this channel. The executor
// pops notified tasks and executes them.
executor: ReadyTasks_Sender,}
impl Task {
// Spawns a new task with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &ReadyTasks_Sender)
where
F: Future<Output = ()> + Send + 'static,{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),});
let _ = sender.send(task);}
// Execute a scheduled task. This creates the necessary `task::Context`
// containing a waker for the task. This waker pushes the task onto the
// mini-tokio scheduled channel. The future is then polled with the waker.
fn poll(self: Arc<Self>) {
// Get a waker referencing the task.
// let waker = todo!(); // task::waker(self.clone());
// Initialize the task context with the waker.
// self.future;
// <F: std::future::Future>(mut f: F)
// let mut f = unsafe { std::pin::Pin::new_unchecked(&mut f) };
let park = Arc::new(Park::default());
let sender = Arc::into_raw(park.clone());
let raw_waker = RawWaker::new(sender as *const _, &VTABLE);
let waker = unsafe { Waker::from_raw(raw_waker) };
let mut cx = Context::from_waker(&waker);
// This will never block as only a single thread ever locks the future.
let mut future = self.future.try_lock().unwrap();
// Poll the future
let _ = future.as_mut().poll(&mut cx);}}
#[test]
fn test_00() {
main();}
static VTABLE: RawWakerVTable = RawWakerVTable::new(
|clone_me| unsafe {
let arc = Arc::from_raw(clone_me as *const Park);
std::mem::forget(arc.clone());
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)},
|wake_me| unsafe { Arc::from_raw(wake_me as *const Park).unpark() },
|wake_by_ref_me| unsafe { (&*(wake_by_ref_me as *const Park)).unpark() },
|drop_me| unsafe { drop(Arc::from_raw(drop_me as *const Park)) },);