How to drop a Sender stored in a self

Hi All

I am trying to make a little program using sway and tokio. To summarize, the latter creates tokio tasks dynamically on sway events. The task runs for a moment and then ends. When processing events, I need to check if a task is already running and in this case I just update the value handled by the task using a message instead of launching a new task.
This is the reason why I store an Arc containing a Sender in the TaskSpawner and I pass a clone of it to each spawned task.

When the program stops (with ctrl+c) I want to wait for the task to end before actually ending. To do so I followed the "Graceful Shutdown" tutorial of tokio with the wait on a rx channel. When I want to drop the sender kept by the task spawner I am unable to do it as it complains about the fact the Arc is moved. Do you have an idea about how I shall do that ?

Here is the code:

use futures_util::stream::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;
use swayipc_async::{Connection, Event, EventType};
use parking_lot::Mutex;
use tokio::signal;
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::timeout;
use tokio::time::Duration;

/// Multiple different commands are multiplexed over a single channel.

#[derive(Debug)]
enum Command {
    SetTarget { target: f32 },
}

struct TaskManager {
    task_spawner: Arc<Mutex<TaskSpawner>>,
}

struct TaskContext {
    rx_ch: mpsc::Receiver<Command>,
}

struct TaskSpawner {
    tx_channels: Arc<Mutex<HashMap<i64, mpsc::Sender<Command>>>>,
    ending_channel_tx: Arc<mpsc::Sender<()>>,
    ending_channel_rx: Arc<Mutex<mpsc::Receiver<()>>>,
}

impl TaskSpawner {
    pub fn new() -> Self {
        let (tx, mut rx) = mpsc::channel(1);

        return TaskSpawner {
            tx_channels: Arc::new(Mutex::new(HashMap::new())),
            ending_channel_tx: Arc::new(tx),
            ending_channel_rx: Arc::new(Mutex::new(rx)),
        };
    }

    pub async fn stop(&mut self) {
        drop(self.ending_channel_tx);
        println!("{:?}", (self.ending_channel_rx).lock().recv().await);
    }

    pub fn get_tx_channel(&mut self, task_id: i64) -> Option<mpsc::Sender<Command>> {
        let mut tx_channels_locked = self.tx_channels.lock();
        if tx_channels_locked.contains_key(&task_id) {
            return Some(tx_channels_locked.get(&task_id).unwrap().clone());
        }
        return None;
    }

    pub fn spawn_task(&mut self, task_id: i64) {
        let mut tx_channels_locked = self.tx_channels.lock();
        if !tx_channels_locked.contains_key(&task_id) {
            let (tx, mut rx) = mpsc::channel(32);
            tx_channels_locked.insert(task_id, tx);
            let ending_channel_tx = (*self.ending_channel_tx).clone();
            tokio::spawn(async move {
                println!("Task {:?} started !", task_id);
                let mut counter = 5;
                loop {
                    match timeout(Duration::from_millis(1000), rx.recv()).await {
                        Err(_) => {
                            counter = counter - 1;
                            println!("apply target {:?} !", counter);
                            if counter <= 0 {
                                break;
                            }
                        }
                        Ok(v) => match v {
                            Some(x) => {
                                println!("changing target {:?} !", x);
                            }
                            None => (),
                        },
                    }
                }
                drop(ending_channel_tx);
                println!("Task {:?} finished !", task_id);
            });
        }
    }
}

impl TaskManager {
    pub fn new() -> Self {
        let t = TaskManager {
            task_spawner: Arc::new(Mutex::new(TaskSpawner::new())),
        };
        return t;
    }
    pub async fn stop(&mut self) {
        let mut locked_spawner = self.task_spawner.lock();
        (*locked_spawner).stop().await;
    }

    pub fn start_event_listening(&mut self) -> task::JoinHandle<()> {
        let task_spawner = self.task_spawner.clone();
        return tokio::spawn(async move {
            let subs = [EventType::Window];
            let mut events = Connection::new()
                .await
                .unwrap()
                .subscribe(subs)
                .await
                .unwrap();
            loop {
                match events.next().await {
                    Some(event) => {
                        match event.unwrap() {
                            Event::Window(w) => {
                                {
                                    let tx_ch: Option<mpsc::Sender<Command>>;
                                    {
                                        let mut locked_spawner = task_spawner.lock();
                                        tx_ch = (*locked_spawner).get_tx_channel(w.container.id);
                                    }
                                    match tx_ch {
                                        Some(v) => {
                                            let cmd = Command::SetTarget { target: 0.2 };
                                            if v.send(cmd).await.is_err() {
                                                println!("Target change sent");
                                            }
                                        }
                                        None => {
                                            let mut locked_spawner = task_spawner.lock();
                                            (*locked_spawner).spawn_task(w.container.id);
                                            println!("Start task for  {:?}", w.container.id);
                                        }
                                    }
                                }
                            }
                            _ => unreachable!(),
                        }
                    }
                    None => (),
                }
            }
        });
    }
}

#[tokio::main]
async fn main() {
    let mut manager = TaskManager::new();
    manager.start_event_listening();
    match signal::ctrl_c().await {
        Ok(()) => {
            println!("stopping");
            manager.stop().await;
        }
        Err(err) => {
            eprintln!("Unable to listen for shutdown signal: {}", err);
        }
    }
}

Store it in an Option and set it to None.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.