New crate announcement – “spawn_groups”

Looking for a structured concurrency construct for awaiting asynchronous tasks and their results? This crate is for you. Please review, criticisms and suggestions are allowed

It would help if you put more about the crate in the title of the thread.

1 Like

Congratulations on you first published crate. I don't know TaskGroup from swift and don't quite get which problem is solved. I think frameworks already have answers for the problem of how to structure tasks in an async runtime, like any server implementation running on an async runtime does it.

Did you probably implement tokio's JoinSet?

Yes & I didn't know it existed already. Main difference is that mine isn't runtime agnostic.

True, so now you can have a look how they implemented it maybe learn :slight_smile:

Yes, implemented it already :sweat_smile: & it is multithreaded too ie using the number of your CPU core as the number of thread to use.

I gave a quick look at your crate, I was expecting something quite self contained and was very surprised to see it internally contains an implementation of a whole async runtime, and two different AsyncStreams! Why does it need to contain an async runtime, especially when I need another runtime to just executing the with_spawn_group function (due to it being async)? And then in other places you use futures_lite::future::block_on, yet another runtime. I would suggest you to either make this a fully fledged runtime that can work without needing other runtimes (both internally and on the user side), or to integrate it better with other runtimes (e.g. by using their own task spawning mechanism) and removing the integrated one.


There are two unsound implementation of Send for the AsyncStreams which don't require ItemType: Send, even though they should because they indeed allow sending ItemTypes across threads. Luckily these types aren't exposed to the end user so it doesn't seem exploitable, but it seems an unnecessary use of unsafe. It also creates a weird situation where SpawnGroup is always Send even when ValueType is not, even though SpawnGroup requires ValueType: Send to even be defined (by the way it's idiomatic to require bounds on generic parameters only in the impls that need them and not in the type definition).


The API for with_spawn_group (and likely the other variants) looks a bit weird:

  • why does it require passing a parameter like i64::TYPE (which in turn also requires importing the GetType trait)? Couldn't the type just be inferred, or specified with turbofish if needed?

  • why does it need to be passed a closure? Normally this is because the closure needs to do some bookkeeping or stuff like that, but it seems to just immediately call it with the SpawnGroup.

To show what I mean, consider this code, which compiles:

let mut group = with_spawn_group(PhantomData, |group| async move { group }).await;
group.spawn_task(Default::default(), async { 1 });

I don't spawn tasks inside the with_spawn_group closure, instead I just get the group out (which generally is either wrong, because it needed the closure, or it means the closure is not needed). Moreover I don't need to import GetType and pass SpawnGroup::<i32>::TYPE to with_spawn_group, instead I just pass a PhantomData without specifying its generic parameter and let the compiler figure it out.


SpawnGroup's drop blocks the thread waiting for all the tasks to finish, but since it will most likely be dropped inside a future this will block the future, which is generally considered bad practice. To solve this you can either let the futures continue executing even after the SpawnGroup is dropped, or use the closure API to do an async wait all after the closure is called. This second option however requires the SpawnGroup future to not escape the closure, which means adding a lifetime which will likely bring problems to the caller. Not sure what the best solution is here.


The implementations of Deref with target dyn Stream seem unneeded. Just implement Stream and let the user know, they will still be able to import StreamExt and use its methods on your types. This also holds for internal types like AsyncStream.


Finally, you should provide some comparison with existing alternatives. tokio's JoinSet has already been mentioned, but I would also add FuturesOrdered/FuturesUnordered from the futures crate. They poll futures concurrently (not in parallel) and allow the user to spawn their own tasks, for example with tokio::spawn, which returns a Future that can be pushed into them.

4 Likes

Thanks for the suggestion :innocent:

I did this because the stream was sent across threads

This isn't for the compiler but for the crate user to know which type is being used

Like the documentation stated, group shouldn't be used or even passed outside the closure

I really thought this through. Any code after the function should wait for the function to finish or by explicitly calling dont_wait_at_drop method which doesn't block the calling thread while it is being dropped

Thanks for the suggestion

Thanks. I'll update the documentation in the next version.
All I can say is thanks @SkiFire13 for above :blush::innocent:

That's not a valid reason to use unsafe, you need to argue why that is safe, that is why your use of unsafe can't cause undefined behaviour. The way I see it the situation can be one of these two:

  • You are sending an AsyncStream<ItemType> across threads, where ItemType implements Send, in which case your implementation is not needed because the compiler already generates a impl<ItemType: Send> Send for AsyncStream<ItemType> for you and your code would work anyway

  • You are sending AsyncStream<ItemType> across threads, where ItemType doesn't implement Send, in which case you are explicitly breaking a rule of the language and your implementation is unsound. You should never do this.

As a crate user, if I want to make sure that the function is using a specific type I would just specify it with the turbofish notation, for example with_spawn_group::<_, _, i64, _>(/* ... */) (the generic arguments could be reordered by moving ResultType first, which would make this easier to type).

Ok but:

  • implementation-wise this doesn't matter at all
  • there's nothing preventing the user from moving it outside the closure

So why is a closure needed?

1 Like

For creating a scope where you can spawn tasks & wait for all, just like std::thread::scope

It would be cumbersome to write it out that way I presumed :man_shrugging:

Valid point. Would correct it on that next version

std::thread::scope using a closure is a limitation, not a feature. It needs to do so because just implementing Drop for Scope won't be enough for safety due to Scope potentially being leaked (and thus not running its drop function), which in turn is required because it allows spawning threads with non-'static closures. This is not the case for your crate because you don't allow spawning non-'static futures (and that's not even possible to do with async).

Forcing everyone i64::TYPE is also not optimal...

Okay thank for the suggestions :smiling_face:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.