Calling functions in another thread

I am writing an extension for an existing system in Rust. This system requires I do all communication with the original system in a single thread (the thread from which I was originally called).

I now want to make my code multi-threaded. I need to (occasionally) make calls back to the host program from other threads. Therefore I think I want that original thread to enter a loop where other threads can make requests for it to call into the child process until a result has been produced, then it returns (does that make sense?)

Is there any standard / nice way of doing this? I imagine other systems have similar issues (I believe Python, and Julia, both have requirements that you should only call back from the thread you were called from?)

You are probably looking for message passing channels. You can find some inspiration in this article, though your case is a bit different:

  1. You are not spawning a new thread, but using an existing thread as the actor.
  2. You want to use non-async message passing channels. (e.g. std::sync::mpsc or the crossbeam crate)
5 Likes

Example:

fn main ()
{
    // Spins a background thread which is the one in charge of
    // spawning and communicating with the child process.
    let worker = ThreadWorker::new(|| {
        start_child_process()
    });
    // From any thread, you can query work from that thread thanks to `worker.run()`.
    // The `worker` (handle) can be cheaply `.clone()`d for multiple ownership
    worker.run(|| {
        compute_stuff()
    });
}

// Implementation

use ::std::{sync::{Arc, mpsc}, thread};

#[derive(Clone)]
pub
struct ThreadWorker {
    task_sender: mpsc::Sender<Box<dyn FnOnce() + Send>>,
    join_handle: Arc<AutoJoinHandle>,
}

impl ThreadWorker {
    pub
    fn new<LoopState> (
        thread_prelude: impl 'static + Send + FnOnce() -> LoopState,
    ) -> Self
    {
        let (task_sender, to_be_received_tasks) = mpsc::channel();
        let _: mpsc::Sender<Box<dyn FnOnce() + Send>> = task_sender;
        let join_handle = Arc::new(AutoJoinHandle(Some(
            ::std::thread::spawn(move || {
                let _state = thread_prelude();
                for task in to_be_received_tasks {
                    task();
                }
            })
        )));
        Self { task_sender, join_handle }
    }

    pub
    fn run<R> (
        self: &'_ ThreadWorker,
        f: impl 'static + Send + FnOnce() -> R,
    ) -> R
    where
        R : 'static + Send,
    {
        /// Poor man's oneshot channel
        use mpsc as oneshot;
        let (sender, receiver) = oneshot::channel();
        let _ = self.task_sender.send(Box::new(move || {
            let _ = sender.send(f());
        }));
        eprintln!("Queried a task from {:?}", thread::current().id());
        receiver.recv().expect("Thread worker panicked")
    }
}

struct AutoJoinHandle<T = ()> /* = */ (
    Option<thread::JoinHandle<T>>,
);

impl<T> Drop for AutoJoinHandle<T> {
    fn drop (self: &'_ mut Self)
    {
        let _ = self.0.take().unwrap().join();
    }
}
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.