Can't explain different behavior between similar task group implementations

I've been fiddling around with writing variations on task group-like structures lately, and I've somehow created two very similar implementations where one implementation fails some tests that the other succeeds at.

The implementations are tokio-worker-map, for mapping values through an asynchronous function on multiple worker tasks, and tokio-worker-nursery, which spawns futures on worker tasks and streams the results. Both implementations work by providing handles for sending input values — function arguments in worker-map's case, futures (boxed internally) in worker-nursery's case — over an mpmc channel from the async-channel crate to be received by pre-spawned worker tasks, which await the operations and send the results over an unbounded Tokio mpsc channel to be received by a stream handle.

Because I thought it would be a neat idea, I gave both implementations methods for "shutting down," closing the input channels and preventing any queued inputs from being processed. The tests of shutdown behavior work fine for worker-map, but for worker-nursery, they're failing because no outputs are making it to the output stream, seemingly because the workers don't receive the shutdown notification before starting to await the first round of inputs.

Aside from the external API, the primary differences between the two implementations are:

  • worker-map uses a bounded async-channel for the input channel, while worker-nursery uses an unbounded channel.

  • worker-map uses Sender::send() (an async method) to send values to the workers, while worker-nursery uses Sender::try_send() (synchronous).

Even if I adjust worker-nursery to match worker-map on these points, the shutdown tests still fail.

I suspect the failure has something to do with nondeterminism around AtomicBool, but then why are the results so consistent on both my Intel MacBook Pro and GitHub Actions' Ubuntu, macOS, and Windows runners? Is there a simple way to guarantee the tests succeed for both implementations?

diff --git a/crates/tokio-worker-nursery/src/lib.rs b/crates/tokio-worker-nursery/src/lib.rs
index f8b4d21..70d752c 100644
--- a/crates/tokio-worker-nursery/src/lib.rs
+++ b/crates/tokio-worker-nursery/src/lib.rs
@@ -504,6 +504,7 @@ mod tests {
             nursery.spawn(async move { rx.await.unwrap() }).unwrap();
             txes.push(tx);
         }
+        tokio::task::yield_now().await;
         assert_eq!(stream.try_recv(), Err(TryRecvError::Empty));
         stream.shutdown();
         for (i, tx) in txes.into_iter().enumerate() {
@@ -523,6 +524,7 @@ mod tests {
             nursery.spawn(async move { rx.await.unwrap() }).unwrap();
             txes.push(tx);
         }
+        tokio::task::yield_now().await;
         assert_eq!(stream.try_recv(), Err(TryRecvError::Empty));
         nursery.shutdown();
         for (i, tx) in txes.into_iter().enumerate() {

The test doesn't have any await points that would allow the worker tasks to make progress. Throwing in an explicit yield gives them a fighting chance. The worker-map tests don't have this problem because they use sender.send(rx).await prior to shutting down.

1 Like

That works, but then why do the tests still fail if I change WorkerNursery::send() to use Sender::send(), making it async?

Because not every await yields back to the runtime. Yielding is what makes async functions non-blocking, not just an await. (To clarify: the Future must return Poll::Pending.)

Take for instance: tokio::time::sleep(Duration::from_millis(0)).await will yield, but std::future::ready(()).await will not. The difference is that sleep() schedules a wakeup with the runtime.

async_channel::Send only yields under certain conditions, like when the channel is full (except yours is unbounded, so that can't happen). So this await is more like the ready() future.

1 Like