How to share tokio JoinSet across different tasks/threads?

Hi, for below example code:

use std::io::Result as IoResult;
use tokio::task::{AbortHandle, JoinSet};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use crossterm::event::{
  DisableFocusChange, DisableMouseCapture,
  EnableFocusChange, EnableMouseCapture, Event,
  EventStream, KeyCode, KeyEventKind,
  KeyEventState, KeyModifiers,
};

type TaskResult = IoResult<()>;

struct EventLoop {
  // This actually doesn't have any meaning.
  // Just want to show that this event loop may
  // contains many data to share across different tasks/threads
  pub some_data: Arc<RwLock<HashMap<i32, i32>>>, 

  // All spawned tasks should be managed by this,
  // so I could easily detach/join/abort all of them.
  pub tasks: JoinSet<TaskResult>,
}

#[derive(Copy, Clone)]
struct Context {
  // Some shared global data
  pub some_data: Arc<RwLock<HashMap<i32, i32>>>, 

  // Question-1: How to share JoinSet here?
  pub tasks: &'static mut JoinSet<TaskResult>, 
}

async fn job1(ctx: Context) -> TaskResult {
  // Update the shared global data by first lock it
  ctx.some_data.write().unwrap().insert(1, 1);
  Ok(())
}

async fn job2(ctx: Context) -> TaskResult {
  // Question-2: How to spawn `job1` inside this `job2` ?
  // since I need to use the JoinSet `tasks` to spawn `job2` here.
  ctx.tasks.spawn(async move || { job1(ctx).await? });
}

impl EventLoop {
  pub async fn new() -> IoResult<Self> {
    EventLoop {
      tasks: JoinSet::new(),
    }
  }

  pub fn init(&mut self) -> IoResult<()> {
    let ctx = Context {
      some_data: self.some_data.clone(),
      // Question-3: The `'static` lifetime cannot compile,
      // Because it's not static lifetime.
      tasks: &'static mut self.tasks
    };
    self.tasks.spawn(async move || {job2(ctx).await? );
  } 

  pub fn run(&mut self) -> IoResult<()> {
    let mut reader = EventStream::new();
    unsafe {
      // Fix multiple mutable references.
      let mut raw_self = NonNull::new(self as *mut EventLoop)
          .unwrap();
      loop {
        tokio::select! {
          // Receive terminal keyboard/mouse events
          polled_event = reader.next() => match polled_event {
            // If received some special keys, exit the loop
            break;
          }

          // Join the completed tasks
          Some(joined_task) = raw_self.as_mut().tasks.join_next() => match joined_task {
            Ok(task_result) => {/* Skip */}
            Err(e) => println!("{}", e),
          }
        }
      }
    }
  }
}

#[tokio::main]
async fn main() -> IoResult<()> {
  let mut evloop = EventLoop::new();
  evloop.init().await?;
  evloop.run().await?; 
}

As mentioned in above sample code comments, I want to use JoinSet along with crossterm EventStream, so I could both receives terminal keyboard/mouse events and spawn async tasks in background.

But to let tasks update the data inside global context, or even spawn new tasks inside the task itself, I want to share the JoinSet across different tasks.

How could I do it?

I read the tokio tutorial: Shared state | Tokio - An asynchronous Rust runtime, it recommends two strategy about shared state:

  1. Share state with mutex, which is exactly what I'm doing in this sample code.
  2. Spawn a task to manage the state and use message passing to operate on it.

I'm confused if I should choose the 2nd strategy, will that solve my issue here?

Based on the 2nd strategy in tokio's tuturial I shared in the question (Shared state | Tokio - An asynchronous Rust runtime), I think:

Use mspc channel to send message inside (multiple) tasks, and receive the message in main loop, the message is a task name (in this case, it's job1), so we could spawn another task in the main thread.

Is it a correct solution?

A few tips:

  • You probably should not be using JoinSet here. Instead I recommend CancellationToken for being able to abort tasks, and optionally TaskTracker for being able to wait for them to exit / wait for abort to finish. See graceful shutdown for more info on this.
  • As a general tip, trying to share something with &'static or &'static mut is always always the wrong way, every time.
    • In the case of JoinSet, this type does not come with any utilities for sharing it.
    • You could still do so. Generally, I recommend using the actor pattern for sharing values for most cases.
    • However, if you switch to using TaskTracker, then it does come built-in with utilities for making it easier to share. Specifically, you can share it by cloning it. See the documentation for more info. The same applies to the cancellation token.

Even though I recommend you switch to TaskTracker instead of JoinSet, you can still benefit from the actor pattern with your application. I strongly recommend reading my article about it. Also, the talk about actors (linked in the article) was done after the article, and has more tips than the article itself, so I also recommend my talk.

4 Likes

Thanks a lot for your tips. Every line seems so helpful! I will take a look when I am back at keyboard.

The actor pattern article is great as well!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.