Better Code Design , Better Performance base on speed, reliability, and robustness in Concurrency

I'm learning concurrency in Rust, I'm practicing with some simple codes.
Here I have a loop which generating 10 threads and sending message (tx) to another thread which receiver(rx) is there. The purpose of this code is calculating how long it will take since system runs till all messages are received in rx. program works well in this design:

use std::sync::mpsc;
use std::thread;
use std::time::*;

fn main(){

let sys_time = SystemTime::now();
let (tx, rx) = mpsc::channel();

for i in 1..11{
    let tx = tx.clone();
    let handle = thread::spawn(move||{
        let text = add_string(i);
        tx.send(text ).unwrap();
    });
    let _ = handle.join().unwrap();
}

println!("data sent at {:?}",sys_time.elapsed().unwrap());
std::process::abort();
for receiver in rx{
    println!("{}",receiver);  
}
let handle = thread::spawn(move || loop {
    match rx.try_recv() {
        Ok(res)=> {
            println!("{}",res);
        },
        Err(_e) => { break;
        }
    }
});
  let _  = handle.join().unwrap();
 println!("data received at {:?}",sys_time.elapsed().unwrap());
}


fn add_string(i: i32 ) -> String {
let mut  text: String = String::new().to_owned();
for _ in 0..i {
    text.push_str("A");
}
text
}

My question is if I change the receive part to this structure :

fn main(){

  let sys_time = SystemTime::now();
  let (tx, rx) = mpsc::channel();

  for i in 1..11{
     let tx = tx.clone();
     let handle = thread::spawn(move||{
        let text = add_string(i);
        tx.send(text ).unwrap();
     });
    let _ = handle.join().unwrap();
  }

  println!("data sent at {:?}", sys_time.elapsed().unwrap());
  for receiver in rx {
    println!("{}", receiver);
  }
  println!("data received at {:?}", sys_time.elapsed().unwrap());
}

the program cannot pass to laste statement which should print the time. it runs and runs infinitely after received all messages. my question is why I'm getting this behaviour in second design?!!!

The second code does not compile. Where is rc defined?

I had to delete some lines to get it to compile. Assuming my changes are correct, the issue is that the send side of your channel stays open , so your loop is still waiting for new messages (which never come). Manually dropping tx after spawning the threads solves the issue. With that, the original tx gets dropped there, and every clone of tx gets dropped at the end of their respective threads.

fn main() {
    let sys_time = SystemTime::now();
    let (tx, rx) = mpsc::channel();

    let mut threads = vec![];
    for i in 1..11 {
        let tx = tx.clone();
        let handle = thread::spawn(move || {
            let text = add_string(i);
            tx.send(text).unwrap();
        });
        threads.push(handle);
    }
    drop(tx);

    println!("data sent at {:?}", sys_time.elapsed().unwrap());
    for receiver in rx {
        println!("{}", receiver);
    }
    println!("data received at {:?}", sys_time.elapsed().unwrap());
}

Playground Link

1 Like

@Cerberuser
Sorry I had pasted wrong code, I've edited my question it's rx

@drewkett That's great You answered the exact thing was question in my mind, But I want to know is it an efficient and practical way to drop sender manually? if not how you can redesign my code to perform better and safer?
I just want to know the right ways to implement concurrent programs...

here when we are initiating a channel the channel and tx is opening right? so do I need to close it manually by drop or I can add _ prefix to tx (will be (_tx,rx))to prevent this issue.
just last question , is this a good practice to solve this issue or I should change my code design?

Appreciating for your comments. :slight_smile:

You don’t want to drop it immediately which is what immediately assigning it to _tx would do. Otherwise you won’t be able to clone it into the other threads. In this case I don’t think there is anything wrong with a drop(tx). You could accomplish the same thing by replacing that line with let _tx = tx but that just does the same thing. (Edit: this is incorrect. See @cuviper’s comment below)

The other approach would be to move the send code into a function that looks like this

fn launch_threads(tx: mpsc::Channel<String>) {
    for i in 1..11 {
          let tx = tx.clone()
          ...
    }
}

This the would take ownership of tx so it would automatically drop at the end of the function call, which you might like better

1 Like

No, the leading _ only supresses the unused variable lint. It doesn't drop the value. You need std::mem::drop() for that.

_ alone is the special non-binding pattern. Other _foo names just suppress unused warnings, but otherwise bind and drop like normal variables.

But even _ doesn't really force a drop. It's just that a moved/temporary value matched against it will have nowhere to go, thus drop.

2 Likes

Yeah it looks better if I can handle my threads and channels under functions, can you provide complete code base on my case by using custom function and channels as parameter.
Thanks for your good information. :slight_smile:

Thanks for explanation. now I understand _ won't work as drop

but in my case it worked!! I don't now how but when I used _ then it drops the main thread properly!! maybe there is an extra concept between channels and _
!!!

As @cupiver said above, let _ and let _tx are different. First will drop, second will not.

Finally by your guys advice I designed my code to this:

use std::sync::mpsc;
use std::thread;
use std::time::*;

fn main() {

    let sys_time = SystemTime::now();
    let (tx, rx) = mpsc::channel();
    launch_threads(tx);
    println!("data sent at {:?}",sys_time.elapsed().unwrap());
    rec_data(rx);
    println!("data received at {:?}",sys_time.elapsed().unwrap());
}


fn add_string(i: i32 ) -> String {
    let mut  text: String = String::new().to_owned();
    for _ in 0..i {
        text.push_str("A");
    }
    text
}

fn launch_threads(tx: mpsc::Sender<String>) {
    for i in 1..11 {
        let tx = tx.clone();
        let handle = thread::spawn(move || {
            let text = add_string(i);
            tx.send(text).unwrap();
        });
        let _ = handle.join().unwrap();
    }
}

fn rec_data(rx: mpsc::Receiver<String>) {
    for rec in rx{
        println!("{}",rec);
    }
}

as you can see I made it functional,Today I learned something new from you guys.
I just need your comment base on this design. is it a good practice? if there is anything I can Improve it let me know. :smiling_face_with_three_hearts:

Since "launch_threads" also waits for them to complete using join, I think a better name would be "run_threads". Otherwise it looks good to me. [ Edit: but see L.F.'s post below.... I missed that the threads are not running in parallel ]

Your code joins a thread before launching the next, thus making it effectively single-threaded. Is that what you want? You probably want to have all threads run simultaneously:

fn launch_threads(tx: mpsc::Sender<String>) {
    let mut threads = vec![];
    for i in 0..10 {
        let tx = tx.clone();
        let handle = thread::spawn(move || {
            let text = add_string(i);
            tx.send(text).unwrap();
        });
        threads.push(handle);
    }
    for thread in threads {
        thread.join().unwrap();
    }
}

Alternatively, you can use scoped threads:

use crossbeam::thread;

fn launch_threads(tx: mpsc::Sender<String>) {
    thread::scope(|s| {
        for i in 0..10 {
            let tx = tx.clone();
            s.spawn(move || {
                // ..
            });
        }
    });
}
3 Likes

That's great, every single comment is a tutorial for me !! :sweat_smile: :sweat_smile:
thank you

You brought important points to table which should I consider such as where I need to use single thread.
Thanks for constructive comment :pray:

1 Like

You actually don’t need to join the threads in this case, because the receiver waits for all the send handles to drop. I forgot I had done that in my original example cause I knew the threads weren’t running serially in the playground. That said, I like your approach better for being more explicit.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.