Hello,
I have a runtime which listens to messages coming over a channel, the messages either tell the runtime to start a new task or to ask the running tasks to stop. I'm using FuturesUnordered to track the futures and to get the results as they finish and I'm keeping a collection of senders one for each of the running actors.
I like FuturesUnordered because it makes managing the futures easier but my problem is there is nothing to link the futures to their senders so as the futures finish the senders hang around and the collection of senders grows until the runtime ends.
I also don't know which sender belongs to which future this isn't a problem in the example but will be in the future.
So is there something like FuturesUnordered which can associate a piece of data with each future? Or should I make a struct like that myself or should I abandon FuturesUnordered and do something else.
I would have thought that other people would have ran into this or similar so maybe thats a sign I'm going about this in the wrong way entirely.
Example (rust playground):
use futures::{stream::FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::{
sync::mpsc::{self, error::SendError},
task::JoinHandle,
};
#[derive(Debug)]
enum Message {
StartAnother,
Stop,
}
#[tokio::main]
async fn main() {
let (sender, receiver) = mpsc::channel(16);
let mut runtime = Runtime::new(receiver);
let _ = tokio::join!(runtime.run(), send_messages(sender));
}
async fn send_messages(sender: mpsc::Sender<Message>) -> Result<(), SendError<Message>> {
for _ in 0..3 {
sender.send(Message::StartAnother).await?;
}
sender.send(Message::Stop).await?;
// Implicit drop of the sender will cause the runtime to stop
Ok(())
}
struct Runtime {
receiver: mpsc::Receiver<Message>,
senders: Vec<mpsc::Sender<Message>>,
futures: FuturesUnordered<JoinHandle<u32>>,
}
impl Runtime {
fn new(receiver: mpsc::Receiver<Message>) -> Self {
Self {
receiver,
senders: vec![],
futures: FuturesUnordered::new(),
}
}
async fn run(&mut self) {
let mut id = 0;
loop {
tokio::select! {
message = self.receiver.recv() => {
match message {
Some(Message::StartAnother) => {
println!("Starting a new task");
let (sender, receiver) = mpsc::channel(16);
let actor = Actor { id, receiver };
id += 1;
let handle = tokio::spawn(run(actor));
self.futures.push(handle);
self.senders.push(sender);
}
Some(Message::Stop) => {
println!("Got stop message, runtime stopping");
for sender in &mut self.senders {
if let Err(e) = sender.send(Message::Stop).await {
println!("Failed to send stop message: {e}");
}
}
break;
}
_ => {
println!("Channel closed, runtime stopping");
break;
}
}
}
Some(task_done) = self.futures.next() => {
println!("{task_done:?} is done");
}
}
}
while let Some(task_done) = self.futures.next().await {
println!("{task_done:?} is done at the end");
}
}
}
struct Actor {
id: u32,
receiver: mpsc::Receiver<Message>,
}
impl Actor {
async fn run(&mut self) -> u32 {
println!("{} is running", self.id);
loop {
tokio::select! {
_ = self.receiver.recv() => println!("Actor {} stopping (got stop message or channel is closed)", self.id),
_ = tokio::time::sleep(Duration::from_secs(1)) => {
println!("Actor {} has completed its work", self.id);
break;
}
}
}
self.id
}
}
async fn run(mut actor: Actor) -> u32 {
actor.run().await
}
[package]
name = "runtime-mvp"
version = "0.1.0"
edition = "2021"
[dependencies]
futures = "0.3.25"
tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread", "sync", "time"] }