How should I produce a single stream of items from this data structure?

I have a Vec<Too> where Foo is a struct with a field which is a Tokio mpsc::Receiver.

I want to basically flatten all of these into one single Reciever. Ideally all of these inner receivers would have some kind of fairness when merging the receivers, so that each one gets a chance to produce the next message if there's something ready.

I've looked at the Stream adapters for Tokio's Receiver, but they take ownership of the Receiver type. I would think there must be some way to do it with a Vec<&mut Receiver>, which would be ideal since I still want the Foo struct to own the receiver.

Is there any way to do this? And anything simpler than trying to use Stream?

If you can, change your design to use clones of a single Sender, so that you have only a single receiver.

Otherwise if you want to combine many channels into one, where messages are delivered immediately, then you'll have to copy the messages to a new channel. Basically spawn a future for each receiver individually that runs a loop that reads one channel and sends on another.

Taking ownership would make that most robust, since you'll know nothing else reads from these receivers at the same time. Wrapping in Option & take() allows that.

Alternatively, use the fact that receivers can be cloned. They're like Arc.

Anything with &mut is likely to be a dead end, since that's a temporary type that makes everything it touches also temporary and limited to a single scope.

If you don't want to deliver messages immediately/concurrently and can drain the receivers one after another, then something like futures::stream::iter(vec.into_iter().map(|f| f.field)).flatten() should do it.

1 Like