Can you `.insert()` to a StreamMap in parallel?

I am constructing a bunch of WatchStreams that are then put into a StreamMap which is then in turn .next().await-ed in a loop just fine. Problem is, creating each WatchStream is a several seconds affair (connection to external servers) and thus creating them one by one in a serial loop is very slow.

Is there a way to parallelize the construction of the StreamMap? Using tokio::spawn({ async move { ... }) predictably fails with "use of moved value" for the StreamMap itself.

I can't wrap StreamMap in a Arc<Mutex<...>> either because I get a compiler error that a mutex guard should not be held locked across .await points. However the advice in the official docs doesn't help me because I don't need to just temporarily borrow and modify something; I need to get a future -- via .next() -- out of the StreamMap that I will .await on afterwards. And since the Arc<Mutex<...>> solution requires you only temporarily gain access to what's enclosed, modify it and drop the guard, I am out of luck.

I am probably having a brain fart and will thus appreciate an advice.

Thanks!

As predicted -- a brain fart and analysis paralysis. Needed to cool off.

Here's a complete and simplified example that works:

use env_logger::fmt::TimestampPrecision;
use env_logger::Env;
use futures::stream::select_all;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use log::{error, trace};
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration, Instant};
use tokio_stream::wrappers::UnboundedReceiverStream;

fn init_logging() {
    env_logger::Builder::from_env(Env::default().default_filter_or("trace"))
        .format_timestamp(Some(TimestampPrecision::Nanos))
        .init();
}

async fn exercise_add_workers_during_runtime() {
    // Make a shared reference to a list of futures and fill them up
    // while a main receiver loop is already trying to wait on them.

    let mut stream_producers: FuturesUnordered<_> = [("A", 2), ("B", 3), ("C", 4)]
        .map(|(name, i)| {
            // We don't seem to need `tokio::spawn` (namely true parallelism) here
            // because the different tasks yield at different times.
            async move {
                let (tx, rx) = mpsc::unbounded_channel::<String>();
                sleep(Duration::from_millis(480 / i)).await;

                async move {
                    for _ in 1..=i {
                        sleep(Duration::from_millis(120 / i)).await;
                        let to_send = format!("{}_{}", name, i);
                        match tx.send(to_send.clone()) {
                            Ok(_) => trace!("Sent {}", to_send.clone()),
                            Err(e) => error!("{}", format!("{:?}", e)),
                        }
                    }
                }
                .await;

                ((name, i), UnboundedReceiverStream::new(rx))
            }
        })
        .into_iter()
        .collect();

    let mut streams = select_all(FuturesUnordered::new());

    loop {
        futures::select! {
            ((name, i), stream) = stream_producers.select_next_some() => {
                trace!("Produced stream {}_{}", name, i);
                streams.push(stream);
            },
            value = streams.select_next_some() => {
                trace!("Received {}", value);
            },
            complete => break,
        }
    }
}

#[tokio::main]
async fn main() {
    init_logging();

    let time = Instant::now();
    exercise_add_workers_during_runtime().await;
    trace!("Program took {:?}", time.elapsed());
}

This:

async move {
    // stuff
}.await;

is just a long way to write this:

// stuff

This:

let mut streams = select_all(FuturesUnordered::new());

Is just a confusing way to write this:

let mut streams = SelectAll::new();
3 Likes

Yeah I am definitely getting messed up. Tokio and Futures have a lot of tools and I'm getting to a point where I forget basic Rust stuff while trying to build my own knowledge base on what to use and when.

Let me revise the coding sample and re-post it a bit later. Thanks for keeping me straight.

Corrected it a bit and it works but not 100% sure I used the right mechanisms. If you are willing to, further feedback is greatly appreciated.

The exercise / scenario is:

  • Spawn stream-producing workers in parallel. This is a hard requirement since my real project mandates quick start-up where we make 20-100 parallel connections and stream stuff from each connection; each connection + handshake takes several seconds so not an option to do it serially.
  • We want the reader loop to begin immediately, hence using a fused stream + futures::select! works well for us -- we want to wait collectively on all streams as each of them has an item ready.
  • We want to add these streams as we go (which is the job of the tokio::spawn stream-producing workers). The important difference between the real project and this toy code is that in the real code returning the ReceiverStream cannot be instant as it's here. It's only given back to our code after a connection + a handshake is made and after we make a certain request (like "give us those records streamed in real time"). Sadly that skews this exercise code quite a bit but nevermind that for the moment.
use futures::stream::{self, SelectAll, StreamExt};
use log::{error, trace};
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use tokio_stream::wrappers::UnboundedReceiverStream;

async fn exercise_add_workers_during_runtime() {
    let mut stream_producers = stream::iter([("A", 2), ("B", 3), ("C", 4)].map(|(name, i)| {
        let (tx, rx) = mpsc::unbounded_channel::<String>();

        tokio::spawn(async move {
            sleep(Duration::from_millis(480 / i)).await;

            for _ in 1..=i {
                sleep(Duration::from_millis(120 / i)).await;
                let to_send = format!("{}_{}", name, i);
                match tx.send(to_send.clone()) {
                    Ok(_) => trace!("Sent {}", to_send.clone()),
                    Err(e) => error!("{}", format!("{:?}", e)),
                }
            }
        });

        ((name, i), UnboundedReceiverStream::new(rx))
    }))
    .fuse();

    let mut streams = SelectAll::new();

    loop {
        futures::select! {
            ((name, i), stream) = stream_producers.select_next_some() => {
                trace!("Produced stream {}_{}", name, i);
                streams.push(stream);
            },
            value = streams.select_next_some() => {
                trace!("Received {}", value);
            },
            complete => break,
        }
    }
}

Using a select! to add the streams is the correct way of doing it. I would probably go for tokio::select! with a pattern rather than select_next_some, which is a bit of a hack, but it would be equivalent to your code.

2 Likes

I'm still feeling my way through and I'm openly admitting I couldn't handle tokio::select! several days ago due to compilation errors I didn't know how to address.

Do you have an example of it handy?

  loop {
        tokio::select! {
            Some(((name, i), stream) ) = stream_producers.next() => {
                trace!("Produced stream {}_{}", name, i);
                streams.push(stream);
            },
            Some(value) = streams.next() => {
                trace!("Received {}", value);
            },
            default => break,
        }
    }
2 Likes

The part that messes me up is mixing futures and tokio_stream. I have several exercises that do that and they work but the "why" is not super clear.

Well, they use the same Stream trait, so that's probably why.

1 Like

I really need to slow down and take a few deep breaths, don't I? :grimacing::smile:

Anyhow, for the sake of the exercise and community knowledge I'll post another full snippet with your code. And I'll read a bit on the cross-pollination between futures and tokio / tokio_stream.

Had to replace default with else btw, then it worked.

Ah, I typed it from memory.

A question: I tried also matching with None in the tokio::select! variant but then the code is either stuck there forever (without break) or it exits (with break on the None clause`).

This is slightly confusing. Observing the behaviour of the code makes me think that tokio::select! will simply move on to the next arm if something returns None and only get to the else clause (and thus terminate) when all arms return None. Is that a correct understanding? Is that the way tokio::select! terminates?

What were you matching None against?

Against stream_producers.next().

Yes, it continues waiting for other arms when one arm returns something that doesn't match the pattern. The else branch runs once all other branches have been disabled.

I'm not sure how you were matching on None. You can't have multiple patterns for a single branch, so I have no idea what code you tried here. Did you not match on Some when doing this?

If these things are not explained clearly in the documentation for tokio::select!, then that's something I would like to know about. Did you read it and find it confusing, or did you not read it at all?

I just had e.g. stream_result = stream_producers.next() and then inside that branch I did a match stream_result ... Some(...) ... None. That's not supposed to be done, I gather?

I did read it, several times even. It just got a bit overwhelming which granted, might be me being stupid.

But to directly answer your question, the Some(...) / None thing was not mentioned and an understanding about fused / non-fused streams is implied. That's not a criticism, I am pointing out what tripped me up personally. And I really couldn't infer from the docs that I should not do the above -- match on Some(...) / None inside each matching arm.

Very likely my still developing understanding.

Ah, if you put a match inside the arm, then the select will just end once the branch returns anything. If you wanted to keep polling other stuff, then you need to use the select's own pattern feature. It does not do the same thing as a match statement inside the branch.

I will look at the select docs and try to improve them.

1 Like

Allow me to get back to you with a few small ideas on improving the docs. I am trying hard not to come across as pretentious here -- I am kind of a polyglot and at times I find it hard to read some of the tokio docs because to me they seem to assume too much of the reader's previous knowledge not only on general Rust (quite okay and zero objections to that) but also to the level of the reader's expertise in tokio and the async Rust ecosystem in general -- which to me is a bit 50/50 because that lead to a very weird chicken vs. egg problem where I had to actively hunt for examples that are both (a) complete, (b) rather basic (i.e. don't assume a lot of async expertise already) and (c) are isolated (a huge problem for me was all the cross-play between futures and tokio and I am still not very clear on that front).

As weird as that may sound, I even made a small special bookmark folder titled "Rust async docs demonstrating tokio / futures usage" since obviously I don't want to spam the forum and want to have a few solid pillars to lean onto when I get stuck (which happened many times the last month).

BTW the async_stream crate has been a life-saver. You remember from a previous thread that I actually had trouble even constructing a collection of futures that implement Stream. At one point I just got messed up by all the into_iter(), collect() with partial type annotations, stream::iter(), fuse() (not a general all case but I do need it often) and what-have-you. The async_stream crate kind of made all of these irrelevant which was a huge cognitive relief.