Sending to channels from threads

Hi I am new to Rust and I am currently trying to implement a system where a collection of elements
can produce , filter and consume data from each other in a pipeline. (Think a simple GStreamer)

So each element will have a send and recv channel. So a file source element as shown below will read a file and send the data into the channel in a separate thread. Eventually I will connect another element that will read from this channel in a different thread.

However, I am having trouble sending to the channels in threads.
The errors I am getting are

the trait std::marker::Sync is not implemented for std::sync::mpsc::Sender<std::string::Stri ng>

I have read some similar threads but am still having trouble.
I am guessing my design is wrong here.

Below I am just showing a simple element sending for simplicity.

Thanks

struct Element {
    comm : Arc<(Sender<String>, Receiver<String>)>,
}

impl Element {
    pub fn new(name : String) -> Self {
        Element{
            comm : Arc::new(mpsc::channel()),
        }
    }
}

struct FileSourceElement {
    base : Element,
    file: File,
}

impl FileSourceElement {
    fn new(name: String, file : File) -> Self {
        let mut element = FileSourceElement { base : Element::new(name), file : file };
        element
    }

    fn run(&self) {
        let mut local_comm = self.base.comm.clone();

        // Spawn off an expensive computation
        thread::spawn(move|| {

            // Read from file and stream data to the output pad.
            let mut reader = BufReader::new(&self.file);

            for line in reader.lines() {
                let l = line.unwrap();
                local_comm.0.send(l).unwrap();
             }

        });
    }
}


#[cfg(test)]
mod tests {
    #[test]
    fn it_works() {
        use ::Element;
        use ::FileSourceElement;
        use std::fs::File;

        match File::open("data/test.txt") {
            Ok(file) => {
              let file_source_element = FileSourceElement::new("My Source".to_string(), file);
              file_source_element.run();
            },
            Err(e) => {
                // fallback in case of failure.
                // you could log the error, panic, or do anything else.
                println!("{}", e);
            }
        };
    }
}

Sync is the trait for types which can be safely shared between threads, but Sender only implements Send and not Sync. You are trying to share the Sender between the threads using the Arc<(Sender, Receiver)>. Try removing the Arc and using the following:

let mut local_comm = self.base.comm.0.clone();
thread::spawn(move|| {
  let mut reader = BufReader::new(&self.file);
  for line in reader.lines() {
    let l = line.unwrap();
    local_comm.send(l).unwrap();
  }
});

This will move (Send) the copy of the Sender to the new thread rather than trying to share access between multiple threads which is explicitly not allowed. Check out the concurrency section of the book for more details.

Thanks

Originally I had the code without the Arc similar to what you suggested. With your changes I get

^^^^^^^^^^^^^ the trait std::marker::Sync is not implemented for std::sync::mpsc::Receiver<std::string::String>

This issue is because you're trying to share &self (i.e. &FileSourceElement) across the threads, since you're taking &self.file in the closure. The compiler is then trying to see if FileSourceElement is Sync by looking at the type recursively - it eventually sees the Receiver and Sender tuple, and determines it's not Sync because those aren't.

If you can avoid accessing &self in the closure, it should work out. One quick-and-dirty (i.e. just for illustration) way is to do:

let f = self.file.try_clone().unwrap();
thread::spawn(move || {
    ...
    let mut reader = BufReader::new(f);
    ...
});
1 Like

Yep that did it. Thanks.
So many little tricks to Rust.

Rust is "front-loaded" in terms of learning curve. It's like learning to drive - some languages will let you fire up the car and hit the road, letting you wreak havoc as you learn "on the fly"; Rust is like a strict teacher that will make sure you have basic skills/knowledge before even taking the car for a spin in an empty parking lot. But, once you're on the road, it's a more pleasant and safe drive.

4 Likes