Have multiple sender one receiver in Rust

In Go, it's relatively easy to program multiple senders and one receiver waiting for an event using channel.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	c :=make(chan int)
	for i:=0;i<5;i++{
		go sender(i, c)
	}
	go receiver(c)
	for  {
		
	}
}

func sender(thread int,c chan int)  {
	for {
		r:= rand.Intn(100)
		if r%5==0 {
			fmt.Println("Thread",thread,"sending",r)
			c <- r
		}else {
			time.Sleep(5 * time.Second)
		}
	}
}

func receiver(c chan int)  {
	for {
		select {
		case r := <-c:
			{
				fmt.Println("Received:",r)
			}
		default:
		}
	}
}

The output would be something like:

Thread 4 sending 40
Received: 40
Thread 0 sending 25
Thread 4 sending 0
Received: 25
Received: 0
Thread 3 sending 45
Received: 45
Thread 4 sending 95
Received: 95
Thread 1 sending 90
Received: 90

However, I find Rust is somewhat less multithread friendly like Go.
So, like the example in the above code, how to do it in Rust (2 separated function to send and receive) properly?
Thank you.

It's pretty much the same as in Go:

use std::sync::mpsc::{channel, Sender, Receiver};
use std::thread::{spawn, sleep};
use std::time::Duration;
use rand::{thread_rng, Rng};

fn main() {
    let (tx, rx) = channel();
    for i in 0..5 {
        let tx = tx.clone();
        spawn(move || sender(i, tx));
    }
    spawn(move || receiver(rx));
    loop {}
}

fn sender(thread: i32, c: Sender<i32>) {
    loop {
        let r = thread_rng().gen_range(0..100);
        if r % 5 == 0 {
            println!("Thread {} sending {}", thread, r);
            c.send(r).unwrap();
        } else {
            sleep(Duration::from_secs(5));
        }
    }
}

fn receiver(c: Receiver<i32>) {
    for r in c {
        println!("Received: {}", r);
    }
}
5 Likes

Isn't this loop {} technically UB? It'd probably be better to join() on the receiver thread, just to be sure.

1 Like

I'm no expert, but an infinite loop seems like it should be perfectly defined to me. Also, isn't one of Rust's main principles that UB should never occur unless unsafe appears in the program somewhere?

That's right, but AFAIK, problem lies in the LLVM: it treats empty loops as the code which can't make so-called "forward progress", thus UB. Here's the relevant article in Rust blog.

1 Like

I see; it's not technically UB in Rust, but it triggers a longstanding compiler bug that has a similar effect.

That issue was fixed in Rust 1.52 with the upgrade to LLVM 12.

8 Likes

Joining is still more idiomatic, right? The empty loop just doesn't sit right with me, but I could be completely wrong.

Yes, empty loops are not great. They consume a lot of CPU.

2 Likes

One more question if you do not mind. What if I have a default case for the receiver? Like

func receiver(c chan int)  {
	for {
		select {
		case r := <-c:
			{
				fmt.Println("Received:",r)
			}
		default:
                 ///DO SOMETHING HERE
		}
	}
}
fn receiver(c: Receiver<i32>) {
    loop {
        match c.try_recv() {
            Ok(r) => println!("Received: {}", r),
            _ => {
                // Do something else
            }
        }
    }
}

You might also be interested in crates like crossbeam-channel, which has a select! macro.

2 Likes

Now this is something I could not find. Maybe I am just really bad at googling but definitely an awesome piece of code.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.