Looking for a structured concurrency construct for awaiting asynchronous tasks and their results? This crate is for you. Please review, criticisms and suggestions are allowed
Congratulations on you first published crate. I don't know TaskGroup from swift and don't quite get which problem is solved. I think frameworks already have answers for the problem of how to structure tasks in an async runtime, like any server implementation running on an async runtime does it.
I gave a quick look at your crate, I was expecting something quite self contained and was very surprised to see it internally contains an implementation of a whole async runtime, and two different AsyncStreams! Why does it need to contain an async runtime, especially when I need another runtime to just executing the with_spawn_group function (due to it being async)? And then in other places you use futures_lite::future::block_on, yet another runtime. I would suggest you to either make this a fully fledged runtime that can work without needing other runtimes (both internally and on the user side), or to integrate it better with other runtimes (e.g. by using their own task spawning mechanism) and removing the integrated one.
There are two unsound implementation of Send for the AsyncStreams which don't require ItemType: Send, even though they should because they indeed allow sending ItemTypes across threads. Luckily these types aren't exposed to the end user so it doesn't seem exploitable, but it seems an unnecessary use of unsafe. It also creates a weird situation where SpawnGroup is always Send even when ValueType is not, even though SpawnGroup requires ValueType: Send to even be defined (by the way it's idiomatic to require bounds on generic parameters only in the impls that need them and not in the type definition).
The API for with_spawn_group (and likely the other variants) looks a bit weird:
why does it require passing a parameter like i64::TYPE (which in turn also requires importing the GetType trait)? Couldn't the type just be inferred, or specified with turbofish if needed?
why does it need to be passed a closure? Normally this is because the closure needs to do some bookkeeping or stuff like that, but it seems to just immediately call it with the SpawnGroup.
To show what I mean, consider this code, which compiles:
let mut group = with_spawn_group(PhantomData, |group| async move { group }).await;
group.spawn_task(Default::default(), async { 1 });
I don't spawn tasks inside the with_spawn_group closure, instead I just get the group out (which generally is either wrong, because it needed the closure, or it means the closure is not needed). Moreover I don't need to import GetType and pass SpawnGroup::<i32>::TYPE to with_spawn_group, instead I just pass a PhantomData without specifying its generic parameter and let the compiler figure it out.
SpawnGroup's drop blocks the thread waiting for all the tasks to finish, but since it will most likely be dropped inside a future this will block the future, which is generally considered bad practice. To solve this you can either let the futures continue executing even after the SpawnGroup is dropped, or use the closure API to do an async wait all after the closure is called. This second option however requires the SpawnGroup future to not escape the closure, which means adding a lifetime which will likely bring problems to the caller. Not sure what the best solution is here.
The implementations of Deref with target dyn Stream seem unneeded. Just implement Stream and let the user know, they will still be able to import StreamExt and use its methods on your types. This also holds for internal types like AsyncStream.
Finally, you should provide some comparison with existing alternatives. tokio's JoinSet has already been mentioned, but I would also add FuturesOrdered/FuturesUnordered from the futures crate. They poll futures concurrently (not in parallel) and allow the user to spawn their own tasks, for example with tokio::spawn, which returns a Future that can be pushed into them.
I did this because the stream was sent across threads
This isn't for the compiler but for the crate user to know which type is being used
Like the documentation stated, group shouldn't be used or even passed outside the closure
I really thought this through. Any code after the function should wait for the function to finish or by explicitly calling dont_wait_at_drop method which doesn't block the calling thread while it is being dropped
Thanks for the suggestion
Thanks. I'll update the documentation in the next version.
All I can say is thanks @SkiFire13 for above
That's not a valid reason to use unsafe, you need to argue why that is safe, that is why your use of unsafe can't cause undefined behaviour. The way I see it the situation can be one of these two:
You are sending an AsyncStream<ItemType> across threads, where ItemType implements Send, in which case your implementation is not needed because the compiler already generates a impl<ItemType: Send> Send for AsyncStream<ItemType> for you and your code would work anyway
You are sending AsyncStream<ItemType> across threads, where ItemType doesn't implement Send, in which case you are explicitly breaking a rule of the language and your implementation is unsound. You should never do this.
As a crate user, if I want to make sure that the function is using a specific type I would just specify it with the turbofish notation, for example with_spawn_group::<_, _, i64, _>(/* ... */) (the generic arguments could be reordered by moving ResultType first, which would make this easier to type).
Ok but:
implementation-wise this doesn't matter at all
there's nothing preventing the user from moving it outside the closure
std::thread::scope using a closure is a limitation, not a feature. It needs to do so because just implementing Drop for Scope won't be enough for safety due to Scope potentially being leaked (and thus not running its drop function), which in turn is required because it allows spawning threads with non-'static closures. This is not the case for your crate because you don't allow spawning non-'static futures (and that's not even possible to do with async).