Keep state while processing a Futures Stream

I'm using Futures 0.3, and the really awesome .for_each_concurrent. However, I'm trying to find a way to keep some state within the closure. Here's a (somewhat contrived) example:

use std::collections::HashMap;
use futures::StreamExt;

fn main() {
    let consumer = futures::stream::repeat("foo!".to_string());
    let mut counter = HashMap::<String, usize>::new();

    futures::executor::block_on(consumer.for_each_concurrent(None, |message| {
        async {
            // Process message...

            // Count messages
            *counter.get_mut(&message).unwrap_or(&mut 0) += 1;

           // Use the counter here...
        }
    }))
}

(Playground link here)

This gives me returns a reference to a captured variable which escapes the closure body.

Basically, I need a way to initialize a HashMap for use within the closure (and only within the closure), but there doesn't seem to be any good way to do this.

Any help would be greatly appreciated.

-Jack

I'm not entirely sure myself. I typically create a context to hold shared state and compose that with my data (aka messages) as it travels through the pipeline of futures for the task. With futures 0.3, I'm struggling to appease the rust compiler.

Since your futures are concurrent, and you may have state shared across threads (if the task executor is multithreaded), I wrapped the counter in a futures::lock::Mutex.

Playground link [1].

I would recommend if you expect to insert a zero default value in your counter hashmap to use the entry API. Example in the playground link. As it stands, your &mut 0 isn't stored in the hashmap.

Update: updated example [2] to use async move, so the context argument can outlive the closure.

Cheers

[1]: Rust Playground

[2]: Rust Playground

1 Like

Thanks, @boxofrox!! That definitely got me past the borrow checker, which was good. Unfortunately, the code ended up pretty ugly, and the Mutexes are worrisome, especially since my executor is single-threaded (and will likely always be). It also seemed to serialize the processing for some reason, which I'm still trying to understand (probably the Mutexes, but it's kind of hard to understand why, since they're of very limited scope).

Anyway, thanks again for the help!!!

-Jack

Note that if a future isn't waiting for I/O or some other blocking event, the executor will let it run until completion to the exclusion of all other tasks that are waiting. More so on a single-thread executor. This aspect of Rust futures bit me a few times.

If the mutexes are sticking around until the processing is done, and they're no longer needed, you can wrap them in code blocks { } so they drop sooner.

E.g.

    futures::executor::block_on(consumer
        .map(move |msg| Context::new(msg, counter.clone()))
        .for_each_concurrent(None, |context| {
            async move {                
                // Process message...
    
                // Count messages
                {
                    let mut counter = context.counter.lock().await;
                    *counter.entry(context.message.clone()).or_insert(0) += 1;
                }
                // Do more work...
            }
        })
    )

Also, since you're using a single-threaded executor, have you tried not using mutexes? I don't know whether that would work. I didn't know what executor was used in my example and just went with the safest option.

1 Like

I iterated the example [1]. You can stuff the mutex into a custom datatype to hide it from the rest of your code. Might keep the ugliness down.

I found in this example, at least with the singe-threaded executor, that Arc wasn't necessary, and a simple &'a Context can be shared among all tasks. Unfortunately, due to all the shared references, some locking mechanism is necessary, so I kept the Mutex. It might be possible to use a RefCell or Cell instead, however, I've never used these before.

[1]: Rust Playground

1 Like

You can use Rc instead of Arc and RefCell instead of Mutex. Modified example.

I think you probably might not need the Rc too if you can teach the system that all Futures have a lifetime which corresponds to the one of the borrowed Counter, but that might need some additional fiddling.

2 Likes

Thanks @boxofrox and @Matthias247! The combination of the Context and the Rc/RefCell did the trick! I was also able to hide some (but not all) of the ugliness within a struct, which made it somewhat nicer.

Thanks again!!!

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.