Execute future in background using Tokio runtime?

I need something like wasm_bindgen_futures::spawn_local, that takes a Future<Output = ()> + 'static, run it in the background and throw it away without .awaiting for it.

I played with:

use tokio::{task, time::*};

#[tokio::main]
async fn main() {
    let local = task::LocalSet::new();
    local.run_until(async move {
        task::spawn_local(async move {
            sleep(Duration::from_millis(1_000)).await;
            println!("qux");
        }).await.unwrap();
    }).await;
    println!("foo");
}

Expected

foo
qux

Got:

qux
foo

Also tried:

use tokio::{task, time::*};

#[tokio::main]
async fn main() {
    let local = task::LocalSet::new();
    local.spawn_local(async move {
        sleep(Duration::from_millis(1_000)).await;
        println!("qux");
    });
    println!("foo");
}

You could use select! for this:

use tokio::time::{sleep, Duration};
use tokio::{pin, select};

#[tokio::main]
async fn main() {
    let main_task = async {
        println!("foo");
    };
    pin!(main_task);

    let background_task = async {
        sleep(Duration::from_millis(1_000)).await;
        println!("qux");
    };
    pin!(background_task);

    let mut main_task_complete = false;
    let mut background_task_complete = false;

    while !main_task_complete || !background_task_complete {
        select! {
            _ = &mut main_task, if !main_task_complete => {
                main_task_complete = true;
            }
            _ = &mut background_task, if !background_task_complete => {
                background_task_complete = true;
            }
        }
    }
}

Playground.

Or FuturesUnordered:

use tokio::time::{sleep, Duration};

use std::future::Future;
use std::pin::Pin;

use futures::stream::{FuturesUnordered, StreamExt};

#[tokio::main]
async fn main() {
    let main_task = async {
        println!("foo");
    };

    let main_task: Pin<Box<dyn Future<Output = ()>>> = Box::pin(main_task);

    let background_task = async {
        sleep(Duration::from_millis(1_000)).await;
        println!("qux");
    };

    let background_task: Pin<Box<dyn Future<Output = ()>>> = Box::pin(background_task);

    let futures = FuturesUnordered::from_iter([main_task, background_task]);

    futures.collect::<()>().await;
}

Playground.

1 Like

The LocalSet can work too; you just have to use it correctly. The foreground work must be spawned too (or no concurrency can happen), and you must await everything at the end, not before you spawn the other work.

use tokio::{join, task, time::*};

#[tokio::main]
async fn main() {
    let local = task::LocalSet::new();
    local.run_until(async move {
        let t1 = task::spawn_local(async move {
            sleep(Duration::from_millis(1_000)).await;
            println!("qux");
        });
        let t2 = task::spawn_local(async move {
            println!("foo");
        });
        // Everything is started, so *now* we await completion.
        t1.await.unwrap();
        t2.await.unwrap();
    }).await;
}
3 Likes

Thanks, @jofas and @kpreid. In my case, however, the code at the top illustrates concurrency in a limited way.

I'd like to know if it's possible to "throwaway" a future:

pub fn background_animation_interval(callback: &(dyn Fn(Duration) + Send + Sync + 'static), period: Duration) -> BackgroundInterval {
    let mut stopped = Arc::new(RwLock::new(false));
    exec_future({
        let stopped = Arc::clone(&mut stopped);
        async move {
            let mut interval = animation_interval(period);
            interval.tick().await;
            loop {
                let delta = interval.tick().await;
                if *stopped.read().unwrap() {
                    break;
                }
                callback(delta);
            }
        }
    });
    BackgroundInterval {
        stopped,
    }
}

In which case this interval could be referenced in multiple places and stopped anywhere. I already have animation_interval and a default_interval functions which return a ticker that can be used to tick manually inside an asynchronous function, but its it possible to have a callback-based one too that can be stopped with a stop() through a reference somewhere?

If it's not possible, then I guess I'd have to remove this exec_future function from my utilities :frowning: and also these callback forms

This is what wasm_bindgen_futures::spawn_local does:

I need the same for Tokio for cross-platform support of exec_future

I'd use a mpsc channel to tell your background task to shut down, rather than using shared memory behind an RwLock:

use tokio::select;
use tokio::sync::mpsc::{self, Receiver};
use tokio::time::sleep;

use std::time::{Duration, Instant};

fn background_animation_interval(
    callback: impl Fn(Duration) + Send + Sync + 'static,
    mut rx: Receiver<()>,
) {
    tokio::spawn(async move {
        loop {
            let instant = Instant::now();
            select! {
                _ = sleep(Duration::from_millis(100)) => {
                    callback(instant.elapsed());
                }
                _ = rx.recv() => {
                    break;
                }
            }
        }
    });
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(1);

    background_animation_interval(|_| println!("qux"), rx);

    sleep(Duration::from_millis(1_000)).await;

    tx.send(()).await.unwrap();

    println!("done");
}

Playground.

The Sender part of the channel is clonable so you can stop it from anywhere by just passing clones around.

2 Likes

I believe you're looking for tokio::spawn. Note that you don't need to .await the JoinHandle returned by the function. It is polled as soon as it is spawned. It is different from the spawn_local as it is not local, but might be scheduled on another thread (which IIRC don't exist in WASM yet).

Edit: Example

use tokio::time::*; // 1.29.1


async fn background() {
    sleep(Duration::from_millis(1_000)).await;
    println!("qux");
}

#[tokio::main]
async fn main() {
    tokio::spawn(background());
    println!("foo");
    
    // If **main** finishes before the Duration in background, qux will not
    // mot be printed.
    sleep(Duration::from_millis(2_000)).await;
}

Playground

3 Likes

I'd like to know if it's possible to "throwaway" a future:

Under normal circumstances, this is done with tokio::task::spawn(), but that requires the future to be Send. If you want to work with !Send futures, then you can create one LocalSet in main() and run your entire application inside it; then you can use spawn_local() like you would spawn(). (Then it will make sense to configure the runtime to be current-thread since the other threads won't get used unless you have normal spawned tasks too.)

3 Likes

In most cases, I'd trigger shutdown on channel disconnect which leaves the main payload available for sending whatever control signals might be necessary. That way, the background task keeps running until there's no way to send it more commands.

fn background_animation_interval(
    callback: impl Fn(Duration) + Send + Sync + 'static,
    mut rx: Receiver<()>,
) {
    tokio::spawn(async move {
        loop {
            let instant = Instant::now();
            select! {
                _ = sleep(Duration::from_millis(100)) => {
                    callback(instant.elapsed());
                }
                msg = rx.recv() => {
                    let Some(msg) = msg else { break; };
                    eprintln!("Received message {msg:?}");
                }
            }
        }
    });
}
2 Likes

I thought of using thread_local!

// use std::borrow::Borrow;
use tokio::{task::LocalSet, time::*};

thread_local! {
    static LOCAL_SET: LocalSet = LocalSet::new();
}

#[tokio::main]
async fn main() {
    LOCAL_SET.with(|s| {
        s.spawn_local(async move {
            sleep(Duration::from_millis(1_000)).await;
            println!("qux");
        });
    });
    println!("foo");
    sleep(Duration::from_millis(3_000)).await;
}

I couldn't see qux being printed, but it's guaranteed to work in the core loop, right?

The LocalSet won't do anything unless it is driven, by calling run_until() or block_on(). In order to have futures running on a single thread, that thread has to run them. There is no “background of a thread” to run stuff in implicitly (that would be a different thread[1]); the thread has to be doing it.

That's why I said to put your entire application in the LocalSet. The sleep you have at the end of main, that is delaying the process exiting, is the thing that needs to also be run inside the LocalSet.

use tokio::{task::{LocalSet, spawn_local}, time::*};

#[tokio::main]
async fn main() {
    let s = LocalSet::new();
    s.run_until(inner_main()).await;
}

async fn inner_main() {
    spawn_local(async move {
        sleep(Duration::from_millis(1_000)).await;
        println!("qux");
    });
    
    println!("foo");
    sleep(Duration::from_millis(3_000)).await;
}

main() makes the LocalSet run until inner_main() completes, which won't happen till the sleep does. Then inner_main() and everything it calls gets to do all the spawn_local()s it wants to. No need for a thread-local or any other complexity.


  1. Or you can look at the LocalSet as the thing that creates the space for the possibility of a background ↩︎

5 Likes

Ahh, so I don't need a reference to the LocalSet created from the entry point? That's what I was wondering!

tokio::task::spawn_local() implicitly finds the enclosing LocalSet. And that's undoubtedly based on thread-local variables inside of Tokio. If you want, you can use LocalSet::spawn_local() to explicitly use the set rather than implicitly, but there is no particular point in creating your own thread-local since Tokio has one already.

The important difference between my code and your previous code is not that it doesn't have a thread_local; it's that all the application logic — all the “background” and “foreground” things — must be inside of the LocalSet::run_until() so that it can run concurrently. If you don't call run_until() or block_on() then the set doesn't do anything because it doesn't have control with which to actually run the futures, and as soon as those functions return, the set is no longer running the tasks spawned in it.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.