Associate each future in a collection of futures with a sender

Hello,

I have a runtime which listens to messages coming over a channel, the messages either tell the runtime to start a new task or to ask the running tasks to stop. I'm using FuturesUnordered to track the futures and to get the results as they finish and I'm keeping a collection of senders one for each of the running actors.

I like FuturesUnordered because it makes managing the futures easier but my problem is there is nothing to link the futures to their senders so as the futures finish the senders hang around and the collection of senders grows until the runtime ends.

I also don't know which sender belongs to which future this isn't a problem in the example but will be in the future.

So is there something like FuturesUnordered which can associate a piece of data with each future? Or should I make a struct like that myself or should I abandon FuturesUnordered and do something else.

I would have thought that other people would have ran into this or similar so maybe thats a sign I'm going about this in the wrong way entirely.

Example (rust playground):

use futures::{stream::FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::{
    sync::mpsc::{self, error::SendError},
    task::JoinHandle,
};

#[derive(Debug)]
enum Message {
    StartAnother,
    Stop,
}

#[tokio::main]
async fn main() {
    let (sender, receiver) = mpsc::channel(16);
    let mut runtime = Runtime::new(receiver);
    let _ = tokio::join!(runtime.run(), send_messages(sender));
}

async fn send_messages(sender: mpsc::Sender<Message>) -> Result<(), SendError<Message>> {
    for _ in 0..3 {
        sender.send(Message::StartAnother).await?;
    }

    sender.send(Message::Stop).await?;

    // Implicit drop of the sender will cause the runtime to stop
    Ok(())
}

struct Runtime {
    receiver: mpsc::Receiver<Message>,
    senders: Vec<mpsc::Sender<Message>>,
    futures: FuturesUnordered<JoinHandle<u32>>,
}

impl Runtime {
    fn new(receiver: mpsc::Receiver<Message>) -> Self {
        Self {
            receiver,
            senders: vec![],
            futures: FuturesUnordered::new(),
        }
    }

    async fn run(&mut self) {
        let mut id = 0;
        loop {
            tokio::select! {
                message = self.receiver.recv() => {
                    match message {
                        Some(Message::StartAnother) => {
                            println!("Starting a new task");
                            let (sender, receiver) = mpsc::channel(16);
                            let actor = Actor { id, receiver };
                            id += 1;
                            let handle = tokio::spawn(run(actor));
                            self.futures.push(handle);
                            self.senders.push(sender);
                        }
                        Some(Message::Stop) => {
                            println!("Got stop message, runtime stopping");
                            for sender in &mut self.senders {
                                if let Err(e) = sender.send(Message::Stop).await {
                                    println!("Failed to send stop message: {e}");
                                }
                            }
                            break;
                        }
                        _ =>  {
                                println!("Channel closed, runtime stopping");
                                break;
                            }
                        }
                    }
                Some(task_done) = self.futures.next() => {
                    println!("{task_done:?} is done");
                }
            }
        }

        while let Some(task_done) = self.futures.next().await {
            println!("{task_done:?} is done at the end");
        }
    }
}

struct Actor {
    id: u32,
    receiver: mpsc::Receiver<Message>,
}

impl Actor {
    async fn run(&mut self) -> u32 {
        println!("{} is running", self.id);
        loop {
            tokio::select! {
                _ = self.receiver.recv() => println!("Actor {} stopping (got stop message or channel is closed)", self.id),
                _ = tokio::time::sleep(Duration::from_secs(1)) => {
                    println!("Actor {} has completed its work", self.id);
                    break;
                }
            }
        }
        self.id
    }
}

async fn run(mut actor: Actor) -> u32 {
    actor.run().await
}

[package]
name = "runtime-mvp"
version = "0.1.0"
edition = "2021"

[dependencies]
futures = "0.3.25"
tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread", "sync", "time"] }

Here are two ways you could do this:

  1. Instead of giving JoinHandles to the FuturesUnordered, you can create a Future which contains the JoinHandle and returns its result together with an ID value of your choice. This could be done with futures::future::join(handle, std::future::ready(id)) in order to get such a future that you can store in your struct. (If you didn't need to write the type out, an async block is simpler: async move { (handle.await, id) }.)

    join in futures::future - Rust

  2. When you push a Sender into the senders vector, check if any of the existing Senders are closed, and discard them since they are no longer needed. (For good performance, make this check only if the Vec has no spare capacity such that it would have to grow, or a similar “amortized” approach.)

1 Like