What's the best way to do one-to-many task comm?

Hi all,

I have an app that (tokio) spawns tasks on demand, dynamically when needed (i.e. the number of tasks is not known up front), and they are all very long lived although can end successfully at any moment. But I want the ability to gracefully shut them all down, and wait for them to finish. What I've come up with is pretty hacky and I was really just wondering if I'm missing something more idiomatic.

There are two parts to this, I need to be able to send a one to many message ("shutdown request"). And then wait for everybody to say "ok, I'm done now", so it's a oneshot one-to-many with an ack.

There is the user's code, an overseer process, and then each of the tasks. Both the overseer and each task is spawned off and the user can only communicate over shared memory primitives at that point.

I have an mpsc that allow the tasks to communicate back to their overseer process. I thought that maybe I could just grab another one of those Senders and give it to the user, who could close the channel and thus signal to everybody to start cleaning up... but it looks like only the Receiver can close the channel. So I ended up sharing an AtomicBool between the user and each of the tasks so that they can all check to see if it has been set to true and then start to shutdown gracefully... but it's quite a technical thing to use an AtomicBool because of all the memory ordering parameters which seem like they are much lower level than I need, and I also have no mechanism to know when the work is complete.

To be able to wait on all the work being finished, I have a RwLock<()> (in an Arc) that I .read() lock when spawing the task

            tokio::spawn(async move {
                outstanding_work.read().await;
                ... my long lived task here ...
            });

so that it is basically doing dynamic latch counting (all the latch counters I could find in the stdlib and tokio docs required a fixed latch count up-front). Then after issuing the shutdown request (through the AtomicBool) I am able to await on a write() lock which will be waiting until all the spawned tasks have finished.

But, yeah, this all feels very hacky... is there a more idiomatic way to do this? I could also consider creating oneshot channels for each of the tasks that I spawn (to receive the shutdown request, and then it gets closed automatically when the task/receiver dies) but I'd then need to keep track of all the channels in my overseer process and that's not good because it's part of normal processing for a task to finish and a new one to be created.

I wrote this article about shutdown in Tokio tasks. It recommends a tool called CancellationToken.

2 Likes

oooh, this is just too perfect, thank you! I will have a look inside, I wonder if it's just internally wrapping an atomic bool and something like a rwlock for dynamic latching :laughing:

"cancel" is the keyword I was missing, I was focussing to much on looking for things with "count" or "shutdown" in their name (also I didn't know about tokio_util, thanks for that).

You can find the internal data structures here.

struct TreeNode {
    inner: Mutex<Inner>,
    waker: tokio::sync::Notify,
}

struct Inner {
    parent: Option<Arc<TreeNode>>,
    parent_idx: usize,
    children: Vec<Arc<TreeNode>>,
    is_cancelled: bool,
    num_handles: usize,
}

The children array exists to support the child_token feature.

1 Like