Why do I need to clone an Arc instance twice in order to use it in a thread closure

I'm writing some code to use a single background thread to process updates that are posted to one or more channels. In the TaskScheduler::new() function I create an unbounded channel and a map. The channel will be used to notify of updates and the map links an ID to a function that should be called to perform the update. My question is why do I need to clone the Arc map twice? Once outside the closure body and once inside the closure body. If I don't clone it before using it in the closure I get:

use of moved value

Which makes sense. However if I don't clone it in the closure I get

cannot move out of `queue_copy`, a captured variable in an `Fn` closure

Which does not make sense to me. And something similar happens for the receiver. What obvious thing am I missing that makes it required to clone the Arc instance twice?

Code below and also on the playground

use std::{
    collections::HashMap,
    fmt::Display,
    sync::{
        atomic::{AtomicUsize, Ordering},
        Arc, Mutex,
    },
    thread::{self, JoinHandle},
    time::Duration,
};

use crossbeam_channel::{Receiver, Sender};

/// The TaskID counter value for the 'NONE' ID.
static NONE_TASK_ID: usize = 0;

/// Atomic counter for TaskID instances
/// The counter starts at 1 because 0 is reserved for the 'NONE' ID.
static TASK_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);

/// Defines a unique ID for task types
///
/// - Can be cloned safely
/// - Can be created safely across many threads
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct TaskID {
    /// The internal value that forms the actual ID. This is set in a
    /// thread-safe maner
    /// Based on this StackOverflow answer: https://stackoverflow.com/a/32936288/539846
    id: usize,
}

impl TaskID {
    /// Create a reference for the current ID.
    pub fn as_ref(&self) -> &Self {
        &self
    }

    /// Returns a value indicating if the given ID is the [none] ID.
    pub fn is_none(&self) -> bool {
        self.id == NONE_TASK_ID
    }

    /// Create a new ID in a thread safe manner.
    pub fn new() -> Self {
        Self {
            id: TASK_ID_COUNTER.fetch_add(1, Ordering::SeqCst),
        }
    }

    /// Returns the TaskID that doesn't belong to any FrameElement. Can be used to initialize
    /// IDs that are unknown.
    pub fn none() -> Self {
        Self { id: NONE_TASK_ID }
    }
}

impl Display for TaskID {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "TaskID [{}]", self.id)
    }
}

/// An inner struct that stores the state of the task scheduler queue
struct TaskSchedulerQueueState {
    /// The map of functions that the task scheduler will run when a notification
    /// of change comes through.
    ready_queue: HashMap<TaskID, Box<dyn Fn() + Sync + Send>>,

    /// A flag indicating whether or not the task scheduler jobs are being cancelled.
    cancelled: bool,
}

impl TaskSchedulerQueueState {
    /// Creates a new instance of the TaskScheduleQueueState structure.
    fn new() -> Self {
        Self {
            ready_queue: HashMap::new(),
            cancelled: false,
        }
    }
}

/// Defines a scheduler that waits for updates to tasks and executes a closure when it
/// gets a notification of an update.
pub struct TaskScheduler {
    /// The template of the channel sender that is used to notify the scheduler when
    /// there is an update for one of the tasks
    sender_template: Sender<TaskID>,

    /// The thread handle for the background update thread
    background_runner: JoinHandle<()>,

    /// The queue containing the tasks that the background thread runs through
    queue: Arc<Mutex<TaskSchedulerQueueState>>,
}

impl TaskScheduler {
    /// Adds a new task to the scheduler and returns the [TaskID] that is used to notify the
    /// scheduler that the task has an update waiting.
    ///
    /// ## Parameters
    ///
    /// 'closure' - The task that should be executed.
    pub fn add(
        &self,
        closure: Box<dyn Fn() + Sync + Send>,
    ) -> (Sender<TaskID>, TaskID) {
        let result = TaskID::new();
        {
            let guard = self.queue.lock();

            let mut map = guard.unwrap_or_else(|err| err.into_inner());
            map.ready_queue.insert(result.clone(), closure);
        }

        (self.sender_template.clone(), result)
    }

    /// Creates the background task update thread
    fn create_thread<F: Fn() + Send + 'static>(f: F) -> JoinHandle<()> {
        thread::spawn(f)
    }

    /// Creates a new [TaskScheduler] instance
    ///
    /// This creates a new background thread that waits for [TaskID]s to be received. Once a
    /// [TaskID] is received
    ///
    /// ## Parameters
    ///
    /// * 'processing_rate_in_hz' - The rate at which tasks should be processed.
    pub fn new(processing_rate_in_hz: i32) -> Self {
        let (s, r) = crossbeam_channel::unbounded();

        let queue = Arc::new(Mutex::new(TaskSchedulerQueueState::new()));
        let queue_copy = queue.clone();

        let background_runner = Self::create_thread(move || {
            let internal_queue = queue_copy.clone();
            let receiver = r.clone();
            Self::run(internal_queue, receiver, processing_rate_in_hz);
        });

        Self {
            sender_template: s,
            background_runner,
            queue,
        }
    }

    /// Runs the task processing.
    fn run(
        queue: Arc<Mutex<TaskSchedulerQueueState>>,
        receiver: Receiver<TaskID>,
        rate_in_hz: i32,
    ) {
        let sleep_time_in_millis = ((1.0 / (rate_in_hz as f64)) * 1000.0) as u64;
        loop {
            let is_cancelled: bool;
            {
                let arc_lock = queue.lock().unwrap_or_else(|err| err.into_inner());
                is_cancelled = arc_lock.cancelled;
            }

            if is_cancelled {
                break;
            }

            // check the receiver
            let result = receiver.try_recv();
            if result.is_ok() {
                let id = result.unwrap();

                // unwrap the hashmap and see if we have the ID
                let func: Option<&Box<dyn Fn() + Sync + Send>>;
                {
                    let map = queue.lock().unwrap_or_else(|err| err.into_inner());
                    func = map.ready_queue.get(&id);

                    match func {
                        Some(f) => {
                            f();
                        }
                        None => {
                            // The ID didn't exist in our map, but we did have an ID, so we just continue
                            // and go around the loop again to see if there's another ID waiting
                        }
                    };
                }
            } else {
                // There was nothing in the channel, so we wait our normal wait time.
                // This is ugly and there should be a better way of doing this ... Maybe async?
                //
                // In order to do this right we should really count how many milliseconds have past since the
                // last time we slept(??) and then set our duration - wake time (give or take)
                thread::sleep(Duration::from_millis(sleep_time_in_millis));
            }
        }

        // Exit because we're done
    }
}

Because you defined create_thread to take F: Fn(), which means it takes a callable which can be called multiple times. As such, you can't move out of captured state within the closure as that state would then be invalid the second time the closure is invoked.

You can solve this by either cloning each time inside the closure (which is what you're currently doing), or by using F: FnOnce() instead to indicate the callable can only be invoked once.

Edit: you can tell this is the case by looking at the definitions of the traits. Fn::call takes &self—calling requires a simple borrow which can be re-used. FnOnce::call_once takes self—calling requires consuming the callable, which prevents using it a second time.

3 Likes

Ok so if I understand this correctly, this happens because every time I invoke the function I will need to own the captured state. And that state is removed from availability when you start invoking the function. So multiple invocations are 'stand alone' and need to own their specific state? Is that correct?

Your point about the Fn::call which takes a reference makes me think that I should make the Self::run function also take references, but then those might go out of scope and not be valid?

Also given that there is only 1 background thread that executes the closure once (and then exits, even if the time to get to the exit is very long) I assume I could use FnOnce?

I'm not sure I understand your understanding.

When you call a closure via the Fn() trait, the closure's code gets an immutable borrow of the captured state. You can't pass queue_copy directly is that this would require moving the captured value out of the closure and into run, which would leave a "hole" in the closure's state. Rust won't let you do this in general (you cannot move out through a borrow, immutable or otherwise). For closures this is important because a Fn() closure can be called more than once.

When you call a closure via the FnOnce() trait, the closure's code consumes the captured state, so you can do whatever you want with it, including moving it somewhere else.

It's no different to having a non-Copy type T and what you can and can't do with it in a function f(v: &T) versus g(v: T).

Looks like it should work. So long as that reference is "anchored" by an Arc in the thread, it should be fine.

That was what I did with your code. I commented out the .clone() inside the closure, verified it didn't compile, then changed F: Fn() to F: FnOnce() and verified that it did compile.

3 Likes

Correct.

Not quite. That state is removed when something in the closure body moves it. Referencing it to, for example, make a local clone that is then moved is fine. This is why your current solution works.

Yes, that is probably the most appropriate solution for this case.

2 Likes

Ok I understand now. Thanks for your explanation. I updated the code using FnOnce. In addition I need to learn to use references a lot more. Often there is no need to move the state. Guess that is my .NET language model coming through

The function traits form a hierarchy:

  • FnOnce is implemented by every function-like type. After all, they must be able to be called at least once.
  • FnMut is implemented by functions that may be called multiple times and may mutate their environment.
  • Fn is implemented by functions that may be called multiple times and do not mutate their environment.

This means that FnOnce is the most flexible to the user and it's the most restrictive to the implementor. Conversely, Fn is the most flexible to the implementor and it's the most restrictive to the user.

Therefore, in an interface, you should never gratuitously default to Fn. That should be the last choice. You should try to make your interfaces work with FnOnce, and if that's
not possible, FnMut.

Requiring Fn is rarely justified, it's usually only an artefact of needing to manage concurrency (shared mutability, reference counting, etc).

4 Likes

Thanks. That's useful information. I will keep this in mind when I am working with Fn type definitions. Thanks again for helping me get better at coding in Rust :slight_smile:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.