Hi All
I am trying to make a little program using sway and tokio. To summarize, the latter creates tokio tasks dynamically on sway events. The task runs for a moment and then ends. When processing events, I need to check if a task is already running and in this case I just update the value handled by the task using a message instead of launching a new task.
This is the reason why I store an Arc containing a Sender in the TaskSpawner and I pass a clone of it to each spawned task.
When the program stops (with ctrl+c) I want to wait for the task to end before actually ending. To do so I followed the "Graceful Shutdown" tutorial of tokio with the wait on a rx channel. When I want to drop the sender kept by the task spawner I am unable to do it as it complains about the fact the Arc is moved. Do you have an idea about how I shall do that ?
Here is the code:
use futures_util::stream::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;
use swayipc_async::{Connection, Event, EventType};
use parking_lot::Mutex;
use tokio::signal;
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::timeout;
use tokio::time::Duration;
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
SetTarget { target: f32 },
}
struct TaskManager {
task_spawner: Arc<Mutex<TaskSpawner>>,
}
struct TaskContext {
rx_ch: mpsc::Receiver<Command>,
}
struct TaskSpawner {
tx_channels: Arc<Mutex<HashMap<i64, mpsc::Sender<Command>>>>,
ending_channel_tx: Arc<mpsc::Sender<()>>,
ending_channel_rx: Arc<Mutex<mpsc::Receiver<()>>>,
}
impl TaskSpawner {
pub fn new() -> Self {
let (tx, mut rx) = mpsc::channel(1);
return TaskSpawner {
tx_channels: Arc::new(Mutex::new(HashMap::new())),
ending_channel_tx: Arc::new(tx),
ending_channel_rx: Arc::new(Mutex::new(rx)),
};
}
pub async fn stop(&mut self) {
drop(self.ending_channel_tx);
println!("{:?}", (self.ending_channel_rx).lock().recv().await);
}
pub fn get_tx_channel(&mut self, task_id: i64) -> Option<mpsc::Sender<Command>> {
let mut tx_channels_locked = self.tx_channels.lock();
if tx_channels_locked.contains_key(&task_id) {
return Some(tx_channels_locked.get(&task_id).unwrap().clone());
}
return None;
}
pub fn spawn_task(&mut self, task_id: i64) {
let mut tx_channels_locked = self.tx_channels.lock();
if !tx_channels_locked.contains_key(&task_id) {
let (tx, mut rx) = mpsc::channel(32);
tx_channels_locked.insert(task_id, tx);
let ending_channel_tx = (*self.ending_channel_tx).clone();
tokio::spawn(async move {
println!("Task {:?} started !", task_id);
let mut counter = 5;
loop {
match timeout(Duration::from_millis(1000), rx.recv()).await {
Err(_) => {
counter = counter - 1;
println!("apply target {:?} !", counter);
if counter <= 0 {
break;
}
}
Ok(v) => match v {
Some(x) => {
println!("changing target {:?} !", x);
}
None => (),
},
}
}
drop(ending_channel_tx);
println!("Task {:?} finished !", task_id);
});
}
}
}
impl TaskManager {
pub fn new() -> Self {
let t = TaskManager {
task_spawner: Arc::new(Mutex::new(TaskSpawner::new())),
};
return t;
}
pub async fn stop(&mut self) {
let mut locked_spawner = self.task_spawner.lock();
(*locked_spawner).stop().await;
}
pub fn start_event_listening(&mut self) -> task::JoinHandle<()> {
let task_spawner = self.task_spawner.clone();
return tokio::spawn(async move {
let subs = [EventType::Window];
let mut events = Connection::new()
.await
.unwrap()
.subscribe(subs)
.await
.unwrap();
loop {
match events.next().await {
Some(event) => {
match event.unwrap() {
Event::Window(w) => {
{
let tx_ch: Option<mpsc::Sender<Command>>;
{
let mut locked_spawner = task_spawner.lock();
tx_ch = (*locked_spawner).get_tx_channel(w.container.id);
}
match tx_ch {
Some(v) => {
let cmd = Command::SetTarget { target: 0.2 };
if v.send(cmd).await.is_err() {
println!("Target change sent");
}
}
None => {
let mut locked_spawner = task_spawner.lock();
(*locked_spawner).spawn_task(w.container.id);
println!("Start task for {:?}", w.container.id);
}
}
}
}
_ => unreachable!(),
}
}
None => (),
}
}
});
}
}
#[tokio::main]
async fn main() {
let mut manager = TaskManager::new();
manager.start_event_listening();
match signal::ctrl_c().await {
Ok(()) => {
println!("stopping");
manager.stop().await;
}
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
}
}
}