Can anyone help me to find out why `handle_task` is not output and why?

use std::thread;
use tokio::runtime::Builder;
use tokio::spawn;
use tokio::sync::mpsc;

fn main() {
    let spawner = TaskSpawner::new();

    spawner.spawn_task(Task {
        name: String::from("test"),
    });
}

pub struct Task {
    name: String,
}

async fn handle_task(task: Task) {
    println!("Got task {:?}", task.name);
}

#[derive(Clone)]
pub struct TaskSpawner {
    spawn: mpsc::Sender<Task>,
}

impl TaskSpawner {
    pub fn new() -> Self {
        let (send, mut recv) = mpsc::channel::<Task>(16);

        let rt = Builder::new_current_thread().enable_all().build().unwrap();

        thread::spawn(move || {
            rt.block_on(async move {
                while let Some(task) = recv.recv().await {
                    spawn(handle_task(task));
                }
            });
        });

        Self { spawn: send }
    }

    pub fn spawn_task(self, task: Task) {
        match self.spawn.blocking_send(task) {
            Ok(()) => {}
            Err(_) => panic!("The shared runtime has shut down."),
        }
    }
}

(Playground)

Your main seems to exit earlier than the spawned runtime thread.

If you modify the TaskSpawner so that it joins on drop, then the task is printed correctly, but main never exits, since the receiver keeps waiting for new tasks forever.

1 Like

Thanks for help! After changing, handle_task is called correctly now, but it seems the main thread will never exist automatically anymore, is this expected?

That's exactly what I meant by this:

I think you should probably re-organize your code, because it can be tricky to solve this with the current setup.

because you are using an single threaded runtime, and the runtime block on a loop which only polls the channel, but other scheduled tasks never got polled.

you can either use an multi-threaded runtime, or properly handle the dispatch in the loop. one method is spawn the loop as an task, then block on the join handle:

	let runner = rt.spawn(async move {
		while let Some(task) = recv.recv().await {
			println!("rt received task");
			spawn(handle_task(task));
		}
	});
	rt.block_on(runner).unwrap();

alternatively, you can use a JoinSet, and in the loop, select between the channel and join set, but tokio::select is very cumbersome to use, I won't bother to give an example here.

and you might also want wait some time in the main thread so it doesn't exit too early:

fn main() {
	let spawner = TaskSpawner::new();
	spawner.spawn_task(Task {
		name: String::from("test"),
	});
	std::thread::sleep(std::time::Duration::from_millis(1));
}

The codes come from tokio's document, I'm learning run async codes with sync executor.

In general, if you want a program that exits when its tasks are complete, your main() has to know to do that, and some way of knowing what those tasks are so as to wait for them. The reason this isn't automatic is that there are often background threads or tasks which shouldn't delay the program exiting because they just run indefinitely until exit (e.g. thread pools).

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.