Tokio::select! without cancellation

Hi,

I need to start two external processes communicating with each other. Both of these processes can be started using a Future, that has already a timeout and a kill_switch set up. Now, some of them might fail earlier, or simply exiting, while the other is still waiting for something to happen (or the timeout to expire):

let send_handle = tokio::spawn(run_sender(timeout, kill_switch));
let recv_handle = tokio::spawn(run_receiver(timeout, kill_switch));

After both are finished (or died, or were cancelled), I need the result of both the sender and the receiver (because both of them contain the output of the program). But, I don't want to wait until the timeout has expired, in case one of them dies, while the other is waiting for the former. Using a tokio::select! does not work, as this one will drop the future that does not complete. But simply awaiting one of them (let's say the sender) does only work if the sender crashes, but not if the receiver dies.

Is there an easy way of waiting for the first one of them (like in tokio::select!) but getting the output of both of them?

This sounds as something inherently contradictory. How can you get an output of the future without waiting for it to finish?

If you need in fact to wait for both to finish (in some way - failure, early exit, timeout, successful exit), just join! them.

Thanks for the answer! Maybe I can explain it a bit better: The timeout or the kill_switch will simply terminate the process that is running inside of the future, and return the stdout and stderr of the program (until it was killed).

So my idea is that as soon as the first one of them terminates, I want to use the kill_switch to terminate the other one too. Using join! will not work because which one would I join first?

I guess one idea would be to change the kill_switch: Right now, I am using a tokio::sync::watch::Receiver (MPSC channel) to send events to the futures (the sender and the receiver). I could change it to a tokio::sync::broadcast::channel, and pass both a broadcast::Sender and a broadcast::Receiver to both the run_sender and run_receiver, and kill the other one from inside. Then, I can join on both of them. I guess that should work.

But do you know if there exists something a bit more expressive and understandable in tokio?

If you need to wait for one of them to finish without canceling the other, you can use a FuturesUnordered, or a StreamMap, I ran into a similar situation with a program which is communicating with a sync loop.

I went with a fork of the old rust std Semaphore augmented with try_acquire added for the kill switch,
as then you don't have to deal with channel failure...

The jumping off point to look at is probably:

with other stuff in semaphore.rs and streams.rs in order to get these streams of different types into a single map, this could be a lot simpler if you don't have to deal with streams of different types though.

Not sure if it is any good, it is my first async program -- But if it isn't I hope someone might know a better way :smiley:

You can use a tokio::select! without consuming the JoinHandle by passing a mutable reference to the JoinHandle. For example, to abort the other task when one of them completes, you could do this:

let out = tokio::select! {
    out1 = &mut jh1 => { jh2.abort(); out1 },
    out2 = &mut jh2 => { jh1.abort(); out2 },
};

Be careful though. If you .await or select on a JoinHandle after it has already returned its output, it will panic.

2 Likes

@ratmice Note that Tokio provides a Semaphore that can be used in async applications.

1 Like

nod I should have mentioned that, but it's acquire is async, which made it difficult to use in my case.

Well, you shouldn't be using a semaphore for shutdown in the first place. Check out this tutorial on the Tokio website.

Yeah, that is what I started with and it turned into a complete mess with the mixture of sync and async threads shutting down, and the need for mpmc channels which are sync on one end and used sync on the other.

Just using a semaphore (non-blockingly, so acquire on startup, release to shutdown)
each loop then tries to acquire it without blocking, and shuts down if that succeeds.

this simplified shutdown mechanism removed a ton of paths which otherwise couldn't error except that the shutdown channel has closed. Anyhow we're diverging a bit from OP's original question, but i'm very happy with the way I did it over the mechanism in that tutorial.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.