Async code which automate moving between threads

I don't understand the guts of async in Rust.
After reading of async book I can use async libraries, but I don't understand this mechanism properly.

For instance.
From such examples ( https://wandbox.org/permlink/QVminRtb6s09lNun ) I understand how to write in C++ linear code which will really be run in different threads.
CoroTask CoroToDealWith() {
InCurrentThread();

    co_await writerQueue; // => Go to writerQueue
    InWriterThread1();
    if (NeedNetwork()) {
        co_yield networkQueue; // => Go to networkQueue
        auto v = InNetworkThread();
        if (v) {
            co_yield UIQueue; // => Go to UIQueue
            InUIThread();
        }
    }

    co_yield writerQueue; // => Go to writerQueue
    InWriterThread2();
    ShutdownAll();
}

But Rust has quite different async model.
How I can organize such behaviour there?
As I undestand it must be written like this:

enum WorkerThread {
    GENERATOR,
    DOWNLOADER,
    PARSER,
    SAVER,
}
async fn function() -> Result<(), ()> {
    // After that point future must be sent into the queue in new thread.
    mover::move_to(WorkerThread::GENERATOR).await?;
    let url: String = generate_url();
    ...
    
    mover::move_to(WorkerThread::DOWNLOADER).await?; // <-- Move again.
    let payload: String = downloader::get(url);
    ...
    
    mover::move_to(WorkerThread::PARSER).await?; // <-- Move again.
    let parsed: JSON = JSON::parse(payload);
    ...
    
    mover::move_to(WorkerThread::SAVER).await?; // <-- Move again.
    save_to_file(parsed, "file.json");
    ...
}

But which part of infrastructure must be responsible for moving async's execution in other thread?

The piece that moves the future between threads is the runtime, which would typically be Tokio. The Tokio codebase contains a work-stealing threaded runtime that can move a future to another thread while it is currently suspended at an .await.

You don't have very much control over which thread your future runs on.

Generally the only time you'd explicitly ask for a piece of code in an async program to be run on a seperate thread would be if it was CPU-bound/blocking (e.g. if you're using a database driver that's not async-aware). Runtimes usually offer a way to spawn these tasks onto a seperate thread.

Everything else is usually managed by the runtime.

As far as I understand it's true for Tokio's Futures in Tokio Runtime.
But I mean, how it will work if I decide to implement my own Futures with my own runtime.

PS: Without any practical purpose, just in order to understand, how it works.

All runtimes use the same future type, which is provided by the standard library. If you implement your own runtime, you can have a function that signals to the runtime that a future wants to be moved to another thread using e.g. a thread-local or channel.

This is exactly the question.
In which part of async infrastructure it has to be done - Spawner, Waker, Task, Executor, Future implementation?
I just can't understand exact connection between these parts.
And more important I cant understand where is the borders for scheduler in Rust's async.

A Future is an object with a poll method that you can repeatedly call to drive the future to completion. Executing a future consists of calling this poll method a bunch of times. Additionally, poll takes a Waker as an argument (through a context), which the future promises to call wake() on when the future is ready to make more progress. E.g. it doesn't make sense to call poll() on a timer before the timeout has elapsed, so the poll method of the timer has to somehow ensure that wake() is called on the waker once the timer elapses.

Note that most futures are implemented with multiple sub-futures, and the poll method of the outer future is implemented by calling poll on the child futures. Thus, a future often has a tree of subfutures inside it. A Task refers to a top-level future that has no parent future, which is independently spawned on an executor.

An Executor or Scheduler is a component that stores multiple tasks (i.e. futures) and executes these tasks by calling poll on the tasks whenever the task notifies the executor through the waker it was provided.

There typically is not a component referred to by Spawner. An executor will usually provide some sort of spawn function, which is sometimes a free-standing function (which finds the executor through globals or thread-locals), or a method on some sort of handle to the executor.

A Runtime is a library that provides an executor, typically along with various utilities such as a timer component that manages sending off wake-ups when timers elapse, and a similar utility for IO. Runtime and executor are often used interchangeably.

A task is moved between threads by calling the poll method from a different thread.

But in this example https://rust-lang.github.io/async-book/02_execution/04_executor.html Executor operates only one level Tasks:

while let Ok(task) = self.ready_queue.recv() {

So, how it can rus sub-futures, or it's just incomplete example?

The body of the poll in future.as_mut().poll(context) will call the sub-futures. The sub-tasks will be fields in the struct/enum stored inside the BoxFuture<'static, ()>.

In this case, if you spawned foo, then the task would correspond to foo, which has bar as a sub-future, which then has a sub-future baz.

async fn foo() {
    bar().await;
}
async fn bar() {
    baz().await;
}
async fn baz() {
    println!("in baz");
}

Polling foo's future will recursively poll bar and then baz.

So, it's impossible to do something like C++ code in a first message?
Because my handwrited future won't do any magic wrapping sub-futures into a structure.

It is impossible to do what? I don't understand the question.

To make such future which will make outside async function to awake in the other thread:

async fn function() -> Result<(), ()> {
    // After that point all async execution must be sent into the queue in different thread.
    mover::move_to(WorkerThread::GENERATOR).await?;
   ...

Because futures can change anything downstream but can't change anything upstream (in a async function where they are nested).

The move_to method could signal to the executor using a thread-local?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.