Proper use of tokio LocalSet outside main

I'm having issues with the use of tokio::task::spawn_local and tokio::task::LocalSet. Examples I see on rust docs show a single local task being passed inside LocalSet::run_until, but I need to call my !Send async block not in main but behind multiple layers of functions and tokio::select loops.

My first take was to initialize the LocalSet in main and pass its reference all the way down to where my spawn_local resides. Then I realized that the task does not get run until I drive the futures, so to try if it's going to work, I tried to run_until(pending()).

The minimal example I tried to reproduce on rust playground seems to work to my surprise, but this approach does not work in my actual code - the task I spawn alongside the local one works, but the spawn_local task does not seem to get driven by the executor at all. Unfortunately I'm not allowed to share the whole repo here.

What is the correct way of passing the LocalSet down the stack and getting it driven by tokio async runtime, along with other non-local tasks? local_set.run_until(pending()) looks like a dirty workaround to me, I'm pretty sure there's a best practice around this which is not what I did.

Here is my example on playground: Rust Playground

#![feature(negative_impls)]

fn never() -> futures_util::future::Pending<()> {
    futures_util::future::pending::<()>()
}

use tokio::{
    sync::mpsc::{channel, Receiver},
    task::{JoinHandle, LocalSet, spawn},
    time::{sleep, Duration}
};

struct NotSend;
impl !Send for NotSend {}

#[tokio::main]
async fn main() {
    let local_set = tokio::task::LocalSet::new();

    tokio::select! {
        _ = my_func(&local_set) => { println!("my_func complete."); }
        _ = local_set.run_until(never()) => {} // `local_spawn`ed task doesn't get polled unless I do this.
        _ = sleep(Duration::from_secs(7)) => { println!("Killing tasks..."); }
        // Omitted: Some other futures not necessarily in local set...
    }

    println!("Done");
}

async fn my_func(local_set: &LocalSet) {
    let mut tasks = Tasks::default();
    let mut receiver_1 = tasks.subscribe(true, local_set);
    let mut receiver_2 = tasks.subscribe(false, local_set);
    
    loop {
        tokio::select! {
            _ = receiver_1.recv() => { println!("1 received."); }
            _ = receiver_2.recv() => { println!("2 received."); }
            // Omitted: Some kill_task logic here...
        }
    }
}

#[derive(Default)]
struct Tasks {
    inner: Vec<JoinHandle<()>>
}

impl Tasks {
    fn subscribe(&mut self, task_type: bool, local_set: &LocalSet) -> Receiver<()> {
        let (sender, receiver) = channel::<()>(1);
        let handle = if task_type {
                spawn(async move {
                    loop {
                        tokio::select! {
                            _ = sleep(Duration::from_secs(2)) => { let _ = sender.send(()).await; }
                        }
                    }
                })
            } else {
                local_set.spawn_local(async move {
                    loop {
                        let _not_send = &mut NotSend; // Forces the use of `spawn_local`
                        tokio::select! {
                            _ = sleep(Duration::from_secs(3)) => { let _ = sender.send(()).await; }
                        }
                    }
                })
            };
        self.inner.push(handle);
        receiver
    }
    
    async fn _kill_all() {
        // Omitted: Iterate through the vector and kill tasks.
    }
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.