I tried to use the following example to model the scenario, consider this example:
use std::{future::Future, task::Poll};
#[tokio::main]
async fn main() {
let time1 = Box::pin(tokio::time::sleep(std::time::Duration::from_secs(4))); // #1
let time2 = Box::pin(tokio::time::sleep(std::time::Duration::from_secs(3))); // #2
let mut tasks = vec![time1,time2];
std::future::poll_fn(|cx|{
println!("start polling");
std::thread::sleep(std::time::Duration::from_secs(2)); // #3
for (index,task) in tasks.iter_mut().enumerate(){
match task.as_mut().poll(cx){
Poll::Ready(_)=>{
println!("task {} already",index);
return Poll::Ready(());
}
Poll::Pending=>{
println!("task {} pending",index);
}
}
}
Poll::<()>::Pending
}).await;
}
#2 should complete before #1, hence, the wake that #2 holds should be called first. In this example, I added #3 such that the code can model the scenario: when #2 is ready, it internally calls wake method to awake the task to continue to take the result of #2, #3 is used to delay the time such that #1 can be completed when #2 prepared to awake the task to process the result of #2, and in the loop, #3 is first taken out to be checked and find its result is ready, then returning it.
That is, #1 is indirectly "awakened" by #2 rather than by the wake it holds. Conversely, #2 should have awakened itself by calling the wake it holds, however, it accidentally awaked #1. Is this feature an intended design for the Rust coroutine?
PS: #1 and #2 denote two tasks that could be completed in an adjacent time.
Update:
The detailed scenario is: when two or more tasks that could potentially be completed at the same time are polled together in the same Future, and one of them calls cx.waker() to re-schedule the Future, re-enter the Future::poll method, and the other tasks happen to be already, the actual result is that the task calling cx.waker is not re-schedule for itself.
I would assume by "wake" you mean waker. your #3 is not an async wait, meaning it doesn't yield to the scheduler, but instead blocks the worker thread that is polling the current task. it doesn't register/schedule any waker to the reactor.
#1 and #2, however, schedules the timers into the tokio reactor's timer queue. I'm not familiar with the exact detail how tokio's reactor is implemented, but I would guess the reactor has it's own thread and manages IO events using non-blocking system call.
when the timer fires, it invokes the associated waker, which in this case should be a waker provided by the tokio runtime and re-schedule the task to the ready queue.
I'm confused by your description of the expected behavior, because the polling for timers of #1 and #2 doesn't start after #3 returns, and async timers won't start before they are poll-ed for the first time.
please provide a bit more context of the problem to help me understand it.
There is a typo in the question, the second paragraph intended to mention #1 while I incorrectly used #3. #3 just ensures that #1 is completed when #2 calls the waker to make tokio re-schedule the tasks.
I tried to diagram the time-line
start polling
|
delay the time // #3
|
polling `#1` -> Pending
|
polling `#2` -> Pending
|
Suspend
|
`#2` first completes, call `waker` to make `tokio` re-schedule the tasks
|
start polling
|
delay the time // #3 ensures that `#1` is completed after `#2` wakes up the task
|
polling `#1` -> Ready
|
returning
In this timeline figure, #2 accidentally wakes up the tasks for #1.
Anyway, #3 just guarantees to reproduce the described scenario.
The waker is a property of the polling context, which is defined on the top-level, i.e. inside the executor, and shared between all the tasks in the current tree. In other words, when executor polls some task, every other task polled by the first one will share the same waker. In your case, every task polled inside poll_fn callback will use the waker passed into the poll_fn.
That is when re-entering the poll_fn, we cannot determine which task calls the waker such that the tokio re-schedules the tasks. In other words, a task is accidentally awakened by other tasks, which is expected behavior in this scenario, right?
to be pedantic, the timer created in #1 is not a task itself, tasks often refer to "top-level" Futures that are spawned into the runtime's scheduler/queue/executor.
as @Cerber-Ursi said. a waker is provided by the polling context, and it can be registered to multiple event sources (possibly be clone()-ed as needed, but all clones should be associated to the original top level task).
the runtime simply polls the top-level task, it is up to the task's poll() function to to whatever it needs to do with "child" futures. if I understand your problem correctly, your problem is really about two futures (#1 and #2) in a race condition. it is up to the application logic to handle such cases.
I'll use future combinators from the futures-lite crate to give some examples:
you can define priorities for the futures, similar to the futures_lite::or() combinator
priority means order of polling in this context
this is the behavior in your example, where timer #1 has priority over timer #2.
futures_lite::or() combined with Either is similar to futures::select()
you can pick either one by a "fair" chance, like the futures_lite::race() combinator
there's no equivalent combinator in the futures crate as far as I know
you can wait for both of them to be ready, like the futures_lite::zip() combinator.
all the futures are polled with the same waker (technically it's possible to create "derived" wakers from a parent waker, but eventually the "root" waker must be called, because that's how the scheduler knows which task to be polled again)
s/task/future/g
I think your question can be answered by this statement: Future authors SHOULD handle spurious wakeup correctly. (in the extreme scenario, a future might be poll()-ed repeated in a tight loop, although this is inefficient, the future implementation should always give the correct result regardless.)
maybe an example is easier to understand. below is snippet to implement a simple timeout mechanism using the futures_lite::or() combinator:
async fn with_timeout<F: Future>(timeout: Duration, f: F) -> Option<<F as Future>::Output> {
or(
// the inner future `f` resolved, return `Some`
async {
let result = f.await;
Some(result)
},
// the timer fired, return `None`
async {
tokio::time::sleep(duration).await;
None
},
).await
}
in this example, it is possible the timer resolves first, but when the task is polled again, the inner future f just resolved (but the reactor has not invoked the waker yet), and it will happily return the result and discard the time out event. it's a perfectly valid scenario.