How use tokio::mpsc with futures::stream::buffered


i need a tokio task listen on channel and get and run a future.
for improve throughtout i want using futures::stream::buffered .

two way i think :

  1. using async-stream and push channel reciever in it
  2. using futures channel (async-channel) that support that function native

What do you mean by futures::stream::bounded?

Are you looking for tokio_stream::wrappers::ReceiverStream?

thanks , but i don't saw function name similar like : for_each concurrent or buffered ?! how its work ? document don't described about it

There's no function anywhere in the futures crate called bounded.

In general, I recommend using the mpsc channel like this:

while let Some(msg) = receiver.recv().await {
    .. handle message ..
.. loop exits when all senders are dropped

i'm so sorry, my means was buffered,

i always use this, but for this component,
i need read from channel and run a future for each message received from channel
and that future may take long because call external service,
then i think using like buffered, for_each_concurrent to run concurrent future,
not just wait for result of single calling

If you import StreamExt, then you can call for_each_concurrent on the wrapper object I linked earlier.


This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.