Difference futures::streamExt Buffered vs for_each_concurrent

Hi .

After reading document, i think both work like same .

Both run concurrent future by limiting factor,
I know buffered using buffer for pending future of upstream and for_each_concurrent can run concurrent future .

But both work almost same

My think was wrong ?!

I need a Tokio task , recv from channel and run operation over it ,

First i try using a round robin for dispatch to workers .

Second I saw can use futures::streamExt have great function for running concurrent

You thought they're mostly same, and they are. So you were right. Right?

Keep in mind that concurrency != parallelism. tokio::spawn may run futures in parallel. But the futures crate itself never do anything parallel on itself, e.g. future::join and the methods above.

1 Like

Thanks. You have experience in Akka Stream !? In akka stream it have a function for doing parallelism, (mapAsync) I searching for finding something like that

To have your concurrent task actually run on more than one thread at the time, you have to spawn the tasks with tokio::spawn. The JoinHandle that tokio::spawn returns can still be used together with buffered or for_each_concurrent to control the execution.

Thanks

Then for_each_concurrent
do it Spawn task internally or not ??!

If answer is yes
Buffered do same that ??
Or
It just sequence call and push pending future
to ​FutureBuffered queue ?!

With for_each_concurrent and other non-spawning concurrency primitives, you will have a single thread that alternates between which task it is working on. Consider reading this article, which gives an example of tokio::join! and illustrates that it does not spawn the tasks.

thanks.

tokio::join :
By running all async expressions on the current task, the expressions are able to run concurrently but not in parallel . This means all expressions are run on the same thread and if one branch blocks the thread, all other expressions will be unable to continue. If parallelism is required, spawn each async expression using tokio::spawn and pass the join handle to join! .

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.