How to correctly transfer a set of tasks ran in one thread to another thread, in tokio?

use std::sync::{Arc, Mutex};

use tokio::task::JoinSet;

async fn gen_task(join_set:& mut JoinSet<()>){
	join_set.spawn(async move {
		loop{
			println!("running");
			tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
		}
	});
}
fn main() {
	let join_set: Arc<Mutex<Option<JoinSet<()>>>> = Arc::new(Mutex::new(Some(JoinSet::<()>::new())));
	let join_set_copy = Arc::clone(&join_set);
	let (tx, rx) = std::sync::mpsc::channel::<()>();
    std::thread::spawn(move ||{
		tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
			let mut mutex = join_set_copy.lock().unwrap();
            gen_task(mutex.as_mut().unwrap()).await;
			while let Ok(_) = rx.recv(){
				break;
			}
			println!("transfer tasks to another thread");
             // The tasks can still be executed if the code is uncommented
			// while let Some(_) = mutex.as_mut().unwrap().join_next().await{

			// }
               
        })
	});
	loop{
		let mut s = String::new();
		std::io::stdin().read_line(& mut s).unwrap();
		if s.trim() == "move"{
			tx.send(()).unwrap();
			break;
		}
	}
	let mut tasks = join_set.lock().unwrap().take().unwrap();
	let t2 = std::thread::spawn(move ||{
		tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
			while let Some(_) = tasks.join_next().await{
				
			}
			println!("tasks complete");
        })
	});
	t2.join().unwrap();
}

In this program, I create a set of tasks in a thread and execute the tasks until I input the command move, which will make the thread exit and continue to execute the tasks in the new thread(new runtime). However, the tasks are directly completed in the new thread without being executed. So, I wonder how correctly implement the thought in the tokio.

you must have misconception about how async task and the tokio runtime work. your example code makes no sense whatsoever. the tokio runtime works in a work stealing style, the tasks (top level futures that are managed by the executor/scheduler) is not bound to any specific threads, the scheduler takes care of them. you just spawn your task on to the runtime, that's also the reason your future needs a Send trait bounds.

it's probably a XY problem. I don't understand what goal you are trying to achieve. please describe what problem you are actually trying to solve instead, not just "I tried to solve a problem with this piece of code but it doesn't work".

1 Like

I want to move a set of tasks in a thread that has a weaker priority to a thread that has a higher priority or whatever.

tokio tasks are not bound to threads, but to a specific runtime context, that is the runtime where the task is spawned in. and tasks cannot migrate between runtime contexts.

if you want to manually control which task runs on which thread, (which I strongly recommend you don't), you must use single threaded runtime (i.e. RuntimeFlavor::CurrentThread), and manually create one runtime for each worker thread and run it by calling block_on(), and you must manually spwan tasks in whichever runtime you want. however, once spawned in, tasks cannot be moved to different runtimes(which is per thread)

if you want to use multiple threads and do load balancing among them, you should use the multi thread runtime, which will spawn worker thread on its own, and you cannot bind the runtime to threads you create, except the thread which is blocked by the root task of the runtime. you can, however, configure the runtime the max number of worker threads it can spawn. since multi thread runtime uses work stealing scheduler, tasks might migrate between different worker threads between await points, but that's managed by the scheduler and out of your control.

if you don't know what you are doing, just use a default multi thread runtime, or the #[tokio::main] attributes. only tune your runtime configuration when when you understand the consequences, because your program will do bad things (crash, deadlock, etc) if misconfigured. and you must always spawn your task in the correct runtime.

when deciding to fine tune runtime, common questions to ask includes:

  • how many IO bounded tasks
  • how many CPU bounded tasks
  • how many CPU cores to use
  • which tasks(s) need timer
  • which runtime should enable IO
  • and many other app specific requirements
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.