Run object function in different threads

I want to build an object and call its member function from different threads:

use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::sync::atomic::{AtomicBool, Ordering};

struct LogRecorder {
    run: AtomicBool,
}

impl LogRecorder {
    pub fn new() -> Self {
        LogRecorder {
            run: AtomicBool::new(false),
        }
    }

    pub fn start(&self) {
        self.run.store(true, Ordering::Relaxed);
    }

    pub fn stop(&self) {
        self.run.store(false, Ordering::Relaxed);
    }

    pub fn recording_loop(&self, channel: &str) {
        // Initialize
        while self.run.load(Ordering::Relaxed) {
            // Do Something with the channel
            println!("Recording {}...", channel);
        }
        println!("Stopped recording for {}", channel);
    }
}

fn main() {
    // Assuming utils::CHANNELS_NAMES defined somewhere
    let log_recorder = Arc::new(LogRecorder::new());

    log_recorder.start();

    let children: Vec<_> = utils::CHANNELS_NAMES.iter().map(|channel| {
        let log_recorder_clone = log_recorder.clone();
        thread::spawn(move || {
            log_recorder_clone.recording_loop(channel);
        })
    }).collect();

    thread::sleep(Duration::from_secs(10));

    log_recorder.stop();

    for child in children {
        let _ = child.join();
    }
}

I get this error.

error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/main.rs:92:13
   |
92 |             log_recorder_clone.recording_loop(channel);
   |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<LogRecorder>`

I tried to make the variables mutable to add mutex but still got an error.
How can I share object function between threads?

1 Like

Your example runs fine on the playground.

4 Likes

I doubt you need it as it is, you may consider to refactor. If there is an object and you want it to run in a separate thread use 'actor' macro from interthread.


use std::thread;
use std::time::Duration;
use std::sync::atomic::{AtomicBool, Ordering};

use interthread::actor;

pub struct LogRecorder;
    // you could have 
    // channel: &'static str,

#[actor(channel=2)]
impl LogRecorder {

    pub fn new() -> Self { Self }

    fn state( opt: Option<bool>) -> bool {
        static FLAG: AtomicBool = AtomicBool::new(false);
        if let Some(b) = opt {
            FLAG.store(b,  Ordering::Relaxed);
        }
        FLAG.load(Ordering::Relaxed)
    }

    pub fn start() {
        Self::state(Some(true));
    }
    pub fn stop() {
        Self::state(Some(false));
    }
    pub fn check() -> bool {
        Self::state(None)
    }
    // 'actor' will accept a 'static' lifetime only
    // else use a String 
    pub fn recording_loop(&self, channel: &'static str) {
        // Initialize
        while Self::state(None) {
            // Do Something with the channel
            println!("Recording {}...", channel);
        }
        println!("Stopped recording for {}", channel);
    }
}

// Assuming utils::CHANNELS_NAMES defined somewhere
mod utils{
    pub static CHANNELS_NAMES: [&'static str;3]= ["chan_1", "chan_2","chan_3"]; 
}


fn main() {

    LogRecorder::start();

    let _children: Vec<_> = utils::CHANNELS_NAMES.iter().map(|channel| {
        let log_recorder = LogRecorderLive::new();
        log_recorder.recording_loop(channel);
        log_recorder
    }).collect();

    thread::sleep(Duration::from_secs(2));

    LogRecorder::stop();
    
    // sleep a bit to see that it actually does 
    // exits the 'recording_loop' 
    thread::sleep(Duration::from_secs(1));
}

The macro will ask to import oneshot channel into the project Cargo.toml and that's all, the macro itself can be exchanged for code on file, as 'interthred' is a development tool.

1 Like

You are right
I guess that in this simple example the bug was fixed :sweat_smile:

Thanks for the check

Thanks,
Didn't knew this one.
Rust is still new to me so I'm finding my ways around with the borrowing and the macros.

So what you suggested is the better way?
Is it better by design / performance / convention?

Also, why did you remove the mutex member and called it as a static?

I don't think what benefit any sort of macro would have here. If you merely want to spawn threads, use std::thread::spawn() or std::thread::scope(). For sharing mutable data, use Arc<Mutex<_>>. No need to overcomplicate things.

If you show us an example that actually fails to compile and/or achieve what you want, we can make more useful, specific suggestions.

Interthread isn't a concept in Rust it's just a crate I'm working
on at the moment, implementing an abstraction over the Actor Model
described and implemented in 'Actors with Tokio' ( please read the article).

Your object is a valid candidate for the 'actor' macro within the crate. It has an initiating function new and public methods with a receiver of 'self' reference &self).
Initially, I considered applying the macro directly to it, but the presence of the recording_loop method, which is essentially a while loop... posed a challenge.
Until 'recording_loop' isn't returning, the model remains unresponsive, even if 'stop' is called.

I was experimenting with it (it's a valid target), aiming to maintain a similar output and a structure close to the original. As you can see, the result is not impressive the least, although I've included the modified code in my initial response as an alternative way of achieving the same thing.

But they are not equivalent. I've realised after that I failed to consider at least two critical aspects which make your approach superior:

  1. Invoking stop() on an instance of your version, it concludes the story by returning () from spawned recording_loop. There's no need for additional Drops or any kind of closing.

In contrast, when stop() is invoked on my version, even after returning from recording_loop - instances and threads, will remain active, waiting for new messages. This outcome is probably
not as expected in your scenario. You will have to drop the array.

  1. Let's consider a scenario where there are multiple sets of channels ( I'm referring to CHANNELS_NAMES ). For each set, initiating a new instance of LogRecorder. If these instances operate concurrently and the 'stop()' function is invoked, it will halt the corresponding array/set of spawned 'recording_loop' instances.
    This is your case.

However, in my version, calling stop() will terminate all sets simultaneously.
Here, the field in the struct definition that you've implemented makes a lot of sense.

Responding to your question about the benefits of my implementation,
the answer is straightforward: there are none!

In fact I own you an apology for the confusion caused, as your implementation is better!

If you would insist on using the 'actor' macro (in this particular case),my best suggestion will be for you to try to encapsulate the logic of (// Do Something with the channel) into a struct and then turn it into an actor. It will have its own separate thread and can be manipulated from outside with ease.

Regarding Mutex. A Mutex serves as a lock, necessary for establishing order so
that only one operation can read or write at a time. On the other hand, atomic types possess their own ordering. You've used them in your example Ordering::Relaxed.

Statics are declared once at compile time while having global scope. It does look like every time
the method is invoked there is happening a new assignement to FLAG but in fact the declaration
is happening once at compile time.

The pattern is far from complicated once you understand it,
it becomes a path of least resistance.

I have to mention that using Arc<Mutex<bool>> in a struct definition field (controlled from outside)
would exclude the second failure.

In conclusion, if One is not prepared to study 'Actors with Tokio' and to configure the struct for the macro, it's best not to use it at all (((

love and clean code to everyone!

P.S. Support for generic types added, enjoy interthread