std::marker::Sync error for struct

Hi I am playing around with a design mainly as a learning exercise where I can string some elements together and each one does a little manipulation on some data before the next to allowed to do it's processing.

I have a simple test case though that I can't get going.
The errors are

92 | thread::spawn(move || {
| ^^^^^^^^^^^^^ the trait std::marker::Sync is not implemented for element::Element

The code is below.

Not sure why this is as the tuple of (Box, Arc) should be able to be passed across thread I believe. Although must be wrong.

How could I call the run method in each of the elements in the vector in a different thread ?

Thanks.

type PipelineResult<T> = Result<T, PipeLineError>;
type PipeLineStreamFormat = (String, String);


pub trait Element {
    fn run(&self, position : Arc<AtomicUsize>);
}


pub struct TestElement{
 
}

impl TestElement {
    pub fn new() -> Self {
        TestElement{
        }
    }
}


impl Element for TestElement {

    fn run(&self, position :  Arc<AtomicUsize>) {
        loop {
            println!("Hello");
        }
    }
}


pub struct Pipeline {

    elements : Vec<(Box<Element>, Arc<AtomicUsize>)>,
    data_queue : Vec<PipeLineStreamFormat>,

}

impl Pipeline {

    pub fn new(name: String) -> Self {
        Pipeline{
            elements : Vec::new(),
            data_queue : Vec::new(),
        }
    }

    pub fn add_element(&mut self, element: Box<Element>) -> PipelineResult<()> {
        self.elements.push((element, Arc::new(AtomicUsize::new(0))));
        Ok(()) 
    }

    pub fn run(&self) {

    loop {

        let e = &self.elements[0];

        thread::spawn(move || {
            e.0.run(e.1);
        });
    }
}


fn main() {

    let mut pipeline = Pipeline::new("example pipeline".to_string());
    let e1 = TestElement::new();
    pipeline.add_element(Box::new(e1)).unwrap();

    pipeline.run();
}

There are a couple of significant issues with your code:

  1. Element is a trait, and the compiler doesn't know it's movable - you need to indicate that it's also Send: trait Element: Send
  2. You're trying to send a reference to the thread by virtue of &elements[0]. You'll get lifetime issues because the Pipeline that owns the Vec may not outlive the spawned threads, and the threads would then be referencing freed/dropped memory.

What you probably want is something like this (trimmed down version of your code, for illustration purposes):

trait Trait: Send {
    fn run(&self, a: Arc<AtomicUsize>);
}

struct Pipeline {
    elements: Vec<(Arc<Mutex<Trait>>, Arc<AtomicUsize>)>
}

fn main() {
    let p = Pipeline {elements: vec![]};
    for e in &p.elements {
        let t = e.0.clone();
        let a = e.1.clone();
        thread::spawn(move || {
            t.lock().unwrap().run(a);
        });
    }
}

First, the Element ownership needs to be shared across threads - the Pipeline's vec is no longer the sole owner; this is done via putting it into an Arc. Second, if you want to keep Element (or Trait in my example) Send only, you need to wrap access to it via some mutual exclusion - I'm using a Mutex here for simplicity. Third, you need to clone() the two parts you're moving into the closure - the Arc keeping the Mutex<Trait> alive and the Arc keeping the AtomicUsize alive.

Let me know if the above makes sense.

Thanks I think that makes sense. I can't compile yet but I going to spend a little time later working out why as to learn by failure :slight_smile:

Sure I will have a follow up.

Thanks

Actually That wasn't too bad. I still need to do some reading on Mutex and Arc.

I'm not 100% sure why the Mutex is needed.

One more issue.

My add to pipeline method is like

pub fn add_element(&mut self, element: Arc<Mutex<Element>>) -> PipelineResult<()> {
        self.elements.push(  (  element, Arc::new(AtomicUsize::new(0))   )   );
        Ok(())
    }

So clients would have to call like

let e1 = TestElement::new();
pipeline.add_element(Arc::new(Mutex::new(e1))).unwrap();

Which is not too friendly.

Do you know off a way to pass a reference to an Element ?

Doing things like

pub fn add_element(&mut self, element: &Element) -> PipelineResult<()> {
        self.elements.push(  (  Arc::new(Mutex::new(element)), Arc::new(AtomicUsize::new(0))   )   );
        Ok(())
 }

results in
the trait element::Element is not implemented for &element::Element

Thanks

Here's code you should be able to copy/paste into Rust playground (or elsewhere) that will run:

use std::sync::{Arc, Mutex};

use std::sync::atomic::*;
use std::thread;
use std::result::Result;

#[derive(Debug)]
pub struct PipeLineError;

type PipelineResult<T> = Result<T, PipeLineError>;
type PipeLineStreamFormat = (String, String);


pub trait Element: Send {
    fn run(&self, position : Arc<AtomicUsize>);
}


pub struct TestElement{
 
}

impl TestElement {
    pub fn new() -> Self {
        TestElement{
        }
    }
}


impl Element for TestElement {
    fn run(&self, position :  Arc<AtomicUsize>) {
        for _ in 0 .. 2 {
            println!("Hello");
        }
    }
}


pub struct Pipeline {
    elements : Vec<(Arc<Mutex<Element>>, Arc<AtomicUsize>)>,
    data_queue : Vec<PipeLineStreamFormat>,
}

impl Pipeline {
    pub fn new(name: String) -> Self {
        Pipeline{
            elements : Vec::new(),
            data_queue : Vec::new(),
        }
    }

    pub fn add_element<T: Element + 'static>(&mut self, element: T) -> PipelineResult<()> {
        self.elements.push((Arc::new(Mutex::new(element)), Arc::new(AtomicUsize::new(0))));
        Ok(()) 
    }

    pub fn run(&self) -> Vec<thread::JoinHandle<()>>{
        let mut handles = Vec::with_capacity(self.elements.len());
        for e in &self.elements {
            let elem = e.0.clone();
            let c = e.1.clone();
            handles.push(thread::spawn(move || {
                elem.lock().unwrap().run(c);
            }));
        }
        handles
    }
}


fn main() {
    let mut pipeline = Pipeline::new("example pipeline".to_string());
    pipeline.add_element(TestElement::new()).unwrap();
    let handles = pipeline.run();
    println!("Pipeline started - waiting for {} threads to finish", handles.len());
    for (i, h) in handles.into_iter().enumerate() {
        h.join();
        println!("Thread {} finished", i);
    }
    println!("Done");
}

You probably want the Pipeline to take ownership of the Element, no? The above code example that I pasted assumes that's what you want.

I somehow overlooked this question. The Mutex is needed because we're sending a reference to the Element. That means multiple threads can access it concurrently. Since we only specified that Element is Send (not Sync), the compiler won't allow sharing the Element directly; wrapping it in Mutex makes it Sync because a Mutex automatically "upgrades" a Send to Sync since it itself prevents concurrent access.

You can mark Element as Sync and then you could share an immutable reference (via Arc) without the Mutex.

Hope that helps.

Thank you so much. I didn't realise you could call the generic function without the type, but it makes sense now.

Could you explain the t<T: Element + 'static> part ?
i have only seen the 'static part in relation to strings so far. Is it adding a lifetime to the Element ?

Trait + 'static, or more generally, Trait + 'a (where 'a is some lifetime) is a lifetime bound on the trait object. What it says is that "any instance of Trait will not have any internal references to anything that lives for less than 'a". Put another way, "all references, if any, inside each and every concrete Trait implementation live for at least 'a".

When you box a trait, such as Mutex<Trait>, the true type is erased - compiler does not know what concrete type is there. But, it somehow needs to reason about lifetimes of any references the real type may hold. That's what the trait object bounds like the above indicate.

For this particular case, if we look at the signature of thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T> 
where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static

We can see that the closure object has to be 'static; that implies that anything captured in the closure body has to be as well. In your example, we effectively capture a Element by virtue of the Arc and Mutex - the innards are an Element. So we need to indicate that Element is also 'static.

The reason thread::spawn wants a 'static bound is because it's trying to ensure that anything captured does not hold any references whose lifetimes are tied to the current thread's execution stack. Since the spawned thread may execute when the calling thread has exited (and thus freed some memory), we need to be sure that this cannot happen. 'static as you probably know means "lifetime of the program", so those references are available to all threads.

1 Like

Great explanation. Thanks

Hi I wondered if I could get some follow up advice. So I have been playing with the code here.

My test repo is https://github.com/glennpierce/datapipeline

So after some thinking I decided to associate a SyncSender and a Receiver with each element in my pipeline. I want data to flow across the pipeline ie one element sending to a channel and being received by the next element, that element may then send on it's own Send channel and the data will travel down through the elements. I think this should be performant enough but have no metrics yet.
Not sure how else I could synchronise data. One element must not process beyond the previous element.

My trouble is again with threads (Sync) and lifetimes.

I would like a like a client use this like

let mut fake_src = FakeSourceElement::new("fake_source");
let mut test_element = TestElement::new("test_element");
let mut pipeline = Pipeline::new("example pipeline".to_string());

pipeline.add_element(&fake_src).unwrap();
pipeline.add_element(&test_element).unwrap();

pipeline.attach_output_pad_to_input_pad(&fake_src, "fake_source_output",
                                        &test_element, "test_element_input").unwrap();


let handles = pipeline.run();

This meant storing the references to the Elements in the pipeline ie the pipeline would not own the elements.
I got this going almost but when it came to spawning the run for each element I got sync errors because I had a field in pipeline containing references to Elements and elements are not Sync and as I was spawning in a member function pipeline was passed as self.

(Hopefully my understanding off this is correct :slight_smile:

Ideally I would have liked to prevent the field of Elements from being passed into the thread spawn closure but was not sure how ?
Is there someway to run a function in a thread rather than a closure ?

this code is on the branch
https://github.com/glennpierce/datapipeline/tree/references/src

Second Try...

After my failure here I went back to the previous code where the pipeline would own Elements and they are contained
withing a Mutex and Arc when added.
This meant clients could no longer call functions like

pipeline.attach_output_pad_to_input_pad(&fake_src, "fake_source_output",
                                        &test_element, "test_element_input").unwrap();

as the pipeline had taken ownership.

I changed this function to

pipeline.attach_output_pad_to_input_pad("fake_source", "fake_source_output",
                                        "test_element", "test_element_input").unwrap();

I now look up the elements I need in a Hashmap. However, as they are wrapped in a Arc and Mutex internally I get lifetime issues when I have private functions that return Elements or fields of Elements.

Ie this function causes the following problem

fn find_element_pad<'a>(&'a self, name : &str, pad_name : &str) -> PipelineResult<&ElementPad> {

        let element = try!(self.find_element(name)).clone().lock().unwrap();

        let pad = try!(element.get_pad(pad_name));
        
        Ok(pad)
    }

50 |         let element = try!(self.find_element(name)).clone().lock().unwrap();                                                                
   |                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                - temporary value only lives until here                          
   |                       |                                                                                                                     
   |                       does not live long enough

I can understand why as Arc is meant to prevent passing the protected data out its scope which is fair enough but
this kind of issue and struggling makes me think I am off the right track and this is not idiosyncratic Rust.

Am I missing something Obvious ?

I am thinking the way forward is the first method and finding someway to limit the scope that gets pass into the threads.

Thanks

Sorry for rambling !

So I think you should decide how you want the data to be shared amongst the stages of the pipeline (i.e. between the threads). If you can move ownership stage to stage, then your data just has to be Send. If you want to share references, then either you need to mark your data as Sync or share it via something that makes it Sync, like Mutex.

If you can be sure that sharing immutable references across threads is safe (typically is unless you allow interior mutation), then marking it Sync should be fine. If you wrap in a Mutex, however, then you can only access the data while the MutexGuard is still live, as it seems you've found out.

The pipeline approach can be achieved by using channels, which I think you may already be using. Each stage has a (rx, tx) pair on which it receives input from previous stage and sends data to next stage, respectively. Of course the first stage may have just a tx and last stage just a rx.

Ownership of the data should transfer from Element to Element. So Send should be enough.
I will play with the pipeline owning the Elements and try that route.

Thanks