I'm having issues with the use of tokio::task::spawn_local
and tokio::task::LocalSet
. Examples I see on rust docs show a single local task being passed inside LocalSet::run_until
, but I need to call my !Send
async block not in main but behind multiple layers of functions and tokio::select
loops.
My first take was to initialize the LocalSet
in main and pass its reference all the way down to where my spawn_local
resides. Then I realized that the task does not get run until I drive the futures, so to try if it's going to work, I tried to run_until(pending())
.
The minimal example I tried to reproduce on rust playground seems to work to my surprise, but this approach does not work in my actual code - the task I spawn
alongside the local one works, but the spawn_local
task does not seem to get driven by the executor at all. Unfortunately I'm not allowed to share the whole repo here.
What is the correct way of passing the LocalSet
down the stack and getting it driven by tokio async runtime, along with other non-local tasks? local_set.run_until(pending())
looks like a dirty workaround to me, I'm pretty sure there's a best practice around this which is not what I did.
Here is my example on playground: Rust Playground
#![feature(negative_impls)]
fn never() -> futures_util::future::Pending<()> {
futures_util::future::pending::<()>()
}
use tokio::{
sync::mpsc::{channel, Receiver},
task::{JoinHandle, LocalSet, spawn},
time::{sleep, Duration}
};
struct NotSend;
impl !Send for NotSend {}
#[tokio::main]
async fn main() {
let local_set = tokio::task::LocalSet::new();
tokio::select! {
_ = my_func(&local_set) => { println!("my_func complete."); }
_ = local_set.run_until(never()) => {} // `local_spawn`ed task doesn't get polled unless I do this.
_ = sleep(Duration::from_secs(7)) => { println!("Killing tasks..."); }
// Omitted: Some other futures not necessarily in local set...
}
println!("Done");
}
async fn my_func(local_set: &LocalSet) {
let mut tasks = Tasks::default();
let mut receiver_1 = tasks.subscribe(true, local_set);
let mut receiver_2 = tasks.subscribe(false, local_set);
loop {
tokio::select! {
_ = receiver_1.recv() => { println!("1 received."); }
_ = receiver_2.recv() => { println!("2 received."); }
// Omitted: Some kill_task logic here...
}
}
}
#[derive(Default)]
struct Tasks {
inner: Vec<JoinHandle<()>>
}
impl Tasks {
fn subscribe(&mut self, task_type: bool, local_set: &LocalSet) -> Receiver<()> {
let (sender, receiver) = channel::<()>(1);
let handle = if task_type {
spawn(async move {
loop {
tokio::select! {
_ = sleep(Duration::from_secs(2)) => { let _ = sender.send(()).await; }
}
}
})
} else {
local_set.spawn_local(async move {
loop {
let _not_send = &mut NotSend; // Forces the use of `spawn_local`
tokio::select! {
_ = sleep(Duration::from_secs(3)) => { let _ = sender.send(()).await; }
}
}
})
};
self.inner.push(handle);
receiver
}
async fn _kill_all() {
// Omitted: Iterate through the vector and kill tasks.
}
}