How to get out of receiver loop in channel

Here I want to simulate a simple concurrency code. I need to send cars to parking every 2 seconds, then I need check the parking is full or not, let say parking has 5 cars space, cars cannot stay more than 5 seconds in parking.
my issue is with receiver loop , it's coming out of that. How can I work with two channels together?
I prefer to use mpsc

use std::sync::mpsc;
use std::thread;
use std::time::*;

fn main(){
    let parking_capacity = 5;
    let (tx, rx) = mpsc::channel();
    let (tx2, rxe2) = mpsc::channel();
    cars_queue(tx);
    parking_entry_gate(tx2,rx,rxe2,parking_capacity);
}

fn cars_queue(tx: mpsc::Sender<i32>){
    let mut car_no = 1;
    let _=  thread::spawn(move ||loop {
        tx.send(car_no).unwrap();
        thread::sleep(Duration::from_secs(2));
        car_no +=1;
    });
}

fn parking_entry_gate(tx2: mpsc::Sender<i32>, rx: mpsc::Receiver<i32>,rxe2: mpsc::Receiver<i32>, mut parking_capacity:u32){

    for rec in rx {
        if parking_capacity < 1{
            println!("Parking is full");
        }else{
            println!("car number {} parked", rec);
            println!("parking capacity {} ", parking_capacity);
            let tx22 = tx2.clone();
            let _handle = thread::spawn( move|| {
                tx22.send(1).unwrap();
            });
           parking_capacity -= 1;
            for rec in &rxe2{
                thread::sleep(Duration::from_secs(5));
                println!("car number {} exited",rec);
                parking_capacity -= 1;
            }
        }
    }
}

(Playground)

Output:

car number 1 parked
parking capacity 5 
car number 1 exited

Errors:

   Compiling playground v0.0.1 (/playground)
    Finished dev [unoptimized + debuginfo] target(s) in 0.97s
     Running `target/debug/playground`
/playground/tools/entrypoint.sh: line 11:     7 Killed                  timeout --signal=KILL ${timeout} "$@"

I’m not clear what the end game really is here, but based on what you have, you could just use a single bounded channel mpsc::sync_channel(parking_capacity) to function as the parking lot. That will prevent cars from being parked when the channel is full.

Your code right now has the first thread sending cars every 2 seconds. The second thread is just sitting waiting indefinitely on the second channel receiver rxe2 since that loop will never exit as long as there is at least one copy of the send side of that channel in memory which there is. If you fix that up, you’ll run into a problem of trying to track parking capacity in multiple locations which will require synchronization across threads which can’t be done with a mutable reference.

All that said, sync_channel pretty much handles this scenario exactly. That said if there’s more to the problem than what you’ve laid out, feel free to add more requirements.

My problem is in receiver part I don't know how to filter data on different sender, assuming :

  1. have a thread which sending cars per 2 seconds.
  2. have a receiver which receives car no . //till here is okay then I don't know how to design code for
  3. every cars exit after let say 5 seconds
  4. and we need to consider parking capacity 5 which you suggested use use bounded channel but how?!
    :thinking: :thinking:

Something like this is what I had in mind

fn main(){
    let parking_capacity = 5;
    let (tx, rx) = mpsc::sync_channel(parking_capacity);
    cars_queue(tx);
    parking_exit_gate(rx);
}

fn cars_queue(tx: mpsc::SyncSender<i32>){
    let mut car_no = 1;
    let _=  thread::spawn(move ||loop {
        match tx.try_send(car_no) {
            Ok(_) => println!("car number {} parked", car_no),
            Err(mpsc::TrySendError::Full(_)) => println!("parking lot full"),
            Err(mpsc::TrySendError::Disconnected(_)) => unreachable!()
        }
        thread::sleep(Duration::from_secs(1));
        car_no +=1;
    });
}

fn parking_exit_gate(rx: mpsc::Receiver<i32>){
    for rec in rx {
        println!("car number {} exited",rec);
        thread::sleep(Duration::from_secs(5));
    }
}

Playground Link

I adjusted the sleep time for the cars coming in to 1s just so that the parking lot fills up before the rust playground kills the program.

1 Like

thanks for your time, but there is a problem when sender reached the parking lot is full then we are getting Err that is breaking and stopping execution. I need if park lot is full still wait for opening new space ...
I'm working along to find solution for this :sweat_smile: :sweat_smile:

Heres a different variant. I added some comments to the code as well

fn main(){
    let parking_capacity = 5;
    // The sync_channel is a bounded channel which means that it 
    // has at most 5 items in it at any one time in this case. 
    // If less than 5 items are in the channel, then any new cars 
    // that try to park are (ie any cars we send to the channel) are 
    // allowed to enter. To remove cars we recv them from the channel
    let (tx, rx) = mpsc::sync_channel(parking_capacity);
    cars_queue(tx);
    parking_entry_gate(rx);
}

fn cars_queue(tx: mpsc::SyncSender<i32>){
    let mut car_no = 1;
    let _=  thread::spawn(move ||loop {
        // Non blocking version. In this version if a car can't park
        // it will go away because we don't try again
        // match tx.try_send(car_no) {
        //     Ok(_) => println!("car number {} parked", car_no),
        //     Err(mpsc::TrySendError::Full(_)) => println!("parking lot full"),
        //     Err(mpsc::TrySendError::Disconnected(_)) => unreachable!()
        // }
        
        // Changing this to send causes it to be a blocking operation.
        // In this version if the car can't park, the next car won't 
        // try to enter until 1 second after the last one parked
        match tx.send(car_no) {
            Ok(_) => println!("car number {} parked", car_no),
            Err(mpsc::SendError(_)) => unreachable!(),
        }
        thread::sleep(Duration::from_secs(1));
        car_no +=1;
    });
}

fn parking_entry_gate(rx: mpsc::Receiver<i32>){
    // Removes one car from mpsc receiver every 5 seconds if there
    // are cars in queue
    for rec in rx {
        println!("car number {} exited",rec);
        thread::sleep(Duration::from_secs(5));
    }
}

Playground

that's great but still it's not solving my problem!! :pensive: :pensive: actually I'm doing some exercise which already implemented go. the exact question is this:

simulate a carpark. Cars (threads) ‘enter’ the carpark every 2 seconds until it reaches max capacity (7 cars). Once full, cars have to wait their turn to enter the car park once a space is available (assume each car is parked for 15 seconds).

I've reached this code I just stuck at last task that I want to interrupt the parent thread to prevent receiving new cars but in same time I want the child thread work till there is space for new car.

use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx,rx) = mpsc::channel::<i32>();
    car_queue(tx);
    entry_gate(rx);


}
fn car_queue(tx:mpsc::Sender<i32>){
    let mut car_no = 1;
    thread::spawn(move||loop{
        tx.send(car_no).unwrap();
        car_no += 1;
        thread::sleep(Duration::from_secs(2));
    });
}

fn entry_gate(rx:mpsc::Receiver<i32>){
    let  num = Arc::new(Mutex::new(7));

    loop{
        let parking_capacity = num.clone();
        *parking_capacity.lock().unwrap() -= 1 ; 
        if *parking_capacity.lock().unwrap() != 0{
            let car_no = rx.recv().unwrap();
            println!("car number {} parked\n################", car_no); 
            println!("parking capacity: {} \n################", *parking_capacity.lock().unwrap());
            let _handle= thread::spawn(move||{
                thread::sleep(Duration::from_secs(15));
                println!("car number {} exited \n################", car_no);
                *parking_capacity.lock().unwrap() += 1 ;
            }); 
        }else{
            println!("parking is full"); // here i want to pause the thread till parking_capacity is more than 0
        
        }   
    }
}

With the way the problem is stated, it does seem like they are asking to run 7 simultaneous threads and the cars are just used as a proxy for that. My code does not do that since each car is not represented by a separate thread.

To fix up your code, i think if you do something like this it should be okay.

loop{
        let parking_capacity = num.clone();
        if *parking_capacity.lock().unwrap() != 0{
            // Don't decrement until you know there is a spot. Since the check and the decrement happen
            // in two separate checks, this could be prone to timing issues where the value could
            // change in between these two checks. In this case, its fine since this is the only line that 
            // can decrement this value
            *parking_capacity.lock().unwrap() -= 1 ; 
            let car_no = rx.recv().unwrap();
            println!("car number {} parked\n################", car_no); 
            println!("parking capacity: {} \n################", *parking_capacity.lock().unwrap());
            let _handle= thread::spawn(move||{
                thread::sleep(Duration::from_secs(15));
                println!("car number {} exited \n################", car_no);
                *parking_capacity.lock().unwrap() += 1 ;
            }); 
        }else{
            println!("parking is full"); // here i want to pause the thread till parking_capacity is more than 
            thread::sleep(Duration::from_secs(1));
        }   
    }

I still think its a good idea to use a bounded channel to control the capacity of the parking lot. They do provide the mechanism you are asking about in terms of how do I wait on a thread for a spot to become available. It takes a bit of getting used to to use channels in this way because traditionally channels are just used to for sending information and the capacity of a bounded channel is just some upper limit on in-flight messages.

I rearranged my last code a bit to show how you might accomplish what you want. I added a parking lot struct to manage the parking lot. I've added a second channel to use as a signal for when cars are leaving. This was needed so that the parking lot thread could just own the receiving channels since you can't clone the receiving channel. The function representing the car in the lot does not doesn't need to manage anything with respect to the overall parking lot. As soon as the function is done, the parking spot will free up.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

struct ParkingLot {
    entry_snd: mpsc::SyncSender<()>,
    exit_snd: mpsc::SyncSender<()>,
}

impl ParkingLot {
    fn new(capacity: usize) -> Self {
        let (entry_snd, entry_rcv) = mpsc::sync_channel(capacity);
        let (exit_snd, exit_rcv) = mpsc::sync_channel(capacity);
        thread::spawn(move || {
            loop {
                // This is a signal received indicating that a car is leaving
                exit_rcv.recv().unwrap();
                // Receiving from the entry channel frees up one slot in the
                // channel, which is equivalent to freeing up one parking spot
                entry_rcv.recv().unwrap();
            }
        });
        Self {
            entry_snd,
            exit_snd,
        }
    }

    fn park(&self, f: impl FnOnce() + Send + 'static) {
        // This is a blocking send to the entry channel. It succeeeds when the
        // channel is less than capacity
        self.entry_snd.send(()).unwrap();
        // This will be used to send a signal to the ParkingLot thread that a
        // car is leaving. It needs to be cloned so that it can be moved to the
        // thread.
        let exit_snd = self.exit_snd.clone();
        thread::spawn(move || {
            // Run the passed funtion/closure now that the car is parked
            f();
            // Signal to parking lot thread that a car is leaving
            exit_snd.send(()).unwrap();
        });
    }
}

fn main() {
    let lot = ParkingLot::new(5);
    for i in 1.. {
        // This function blocks until there is a spot available
        lot.park(move || {
            println!("car {} parked", i);
            thread::sleep(Duration::from_secs(5));
            println!("car {} leaving", i);
        });
        // Wait 1 second before trying to park another car
        thread::sleep(Duration::from_secs(1))
    }
}

Playground

(On a side note, it turns out I don't find it very helpful to use cars as a metaphor for threads :slight_smile:)

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.