Probably a bug found

my code is made from code examples from Rust book.

the problem is coming when I enable a counter in threads (Listing 16-15 of Rust book).

the first two screens show that the code works (it is made from Listing 20-24).

the second two screens show that the code is broken after I use the counter.

Can you post your code and the error correctly formatted ? Instead of screenshots.

Preferably with a reproducible example. (since that will allow other to test locally)

6 Likes

Here is the code

use std::sync::{Arc, mpsc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let server_process_center = ServerProcessCenter::new(2);
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        server_process_center.r#do(Box::new(move || {
            let mut number = counter.lock().expect("counter lock fail 1");//comment this string to get rid of the problem
            *number += 1;//comment this string to get rid of the problem

            println!("aaaaaa");
        }));
    }
    assert_eq!(10, *counter.lock().expect("counter lock fail 2"));
}

pub trait ServerProcessCenterInterface {
    type Job;

    fn new(employee_quantity: usize) -> Self;
    fn r#do(&self, job: Self::Job);
}

pub struct ServerProcessCenter {
    employees: Vec<
        Box<
            dyn ServerEmployeeInterface<
                ReceiptWindow = Arc<Mutex<mpsc::Receiver<ServerJob>>>,
                WorkTool = Option<thread::JoinHandle<()>>,
            >,
        >,
    >,
    shipping_window: Option<mpsc::Sender<ServerJob>>,
}

impl ServerProcessCenterInterface for ServerProcessCenter {
    type Job = ServerJob;

    fn new(employee_quantity: usize) -> Self {
        assert!(employee_quantity > 0);
        let (shipping_window, receipt_window) = mpsc::channel();
        let receipt_window = Arc::new(Mutex::new(receipt_window));
        let mut employees = Vec::with_capacity(employee_quantity);
        for ordinal_number in 0..employee_quantity {
            employees.push(Factory::get_server_worker(
                ordinal_number,
                Arc::clone(&receipt_window),
            ));
        }

        Self {
            employees,
            shipping_window: Some(shipping_window),
        }
    }

    fn r#do(&self, job: Self::Job) {
        self.shipping_window
            .as_ref()
            .expect("Cannot get option value")
            .send(job)
            .expect("Cannot send job");
    }
}

impl Drop for ServerProcessCenter {
    fn drop(&mut self) {
        drop(self.shipping_window.take());

        for employee in &mut self.employees {
            println!(
                "Employee #{} returning instruments",
                employee.get_ordinal_number()
            );
            if let Some(work_tool) = employee.get_work_tool().take() {
                work_tool
                    .join()
                    .expect("Cannot join on the associated thread");
            }
        }
    }
}

pub trait ServerEmployeeInterface {
    type ReceiptWindow;
    type WorkTool;

    fn new(ordinal_number: usize, receipt_window: Self::ReceiptWindow) -> Self
        where
            Self: Sized;

    fn get_work_tool(&mut self) -> &mut Self::WorkTool;
    fn get_ordinal_number(&self) -> usize;
}

pub struct ServerEmployee {
    ordinal_number: usize,
    work_tool: Option<thread::JoinHandle<()>>,
}

impl ServerEmployeeInterface for ServerEmployee {
    type ReceiptWindow = Arc<Mutex<mpsc::Receiver<ServerJob>>>;
    type WorkTool = Option<thread::JoinHandle<()>>;

    fn new(ordinal_number: usize, receipt_window: Self::ReceiptWindow) -> Self {
        let work_tool = thread::spawn(move || loop {
            let message = receipt_window
                .lock()
                .expect("Cannot acquire a mutex")
                .recv();

            match message {
                Ok(job) => {
                    println!("Employee #{} got a job, doing", ordinal_number);
                    job();
                }
                Err(receive_error) => {
                    eprintln!("Got error from channel: {}", receive_error);
                    println!("Employee #{} stopping", ordinal_number);
                    break;
                }
            }
        });

        Self {
            ordinal_number,
            work_tool: Some(work_tool),
        }
    }

    fn get_work_tool(&mut self) -> &mut Self::WorkTool {
        &mut self.work_tool
    }

    fn get_ordinal_number(&self) -> usize {
        self.ordinal_number
    }
}

pub type ServerJob = Box<dyn FnOnce() + Send + 'static>;

pub trait FactoryInterface {
    fn get_server_worker(
        ordinal_number: usize,
        receipt_window: Arc<Mutex<mpsc::Receiver<ServerJob>>>,
    ) -> Box<
        dyn ServerEmployeeInterface<
            ReceiptWindow = Arc<Mutex<mpsc::Receiver<ServerJob>>>,
            WorkTool = Option<thread::JoinHandle<()>>,
        >,
    >;
}

pub struct Factory {}

impl FactoryInterface for Factory {
    fn get_server_worker(
        ordinal_number: usize,
        receipt_window: Arc<Mutex<mpsc::Receiver<ServerJob>>>,
    ) -> Box<
        dyn ServerEmployeeInterface<
            ReceiptWindow = Arc<Mutex<mpsc::Receiver<ServerJob>>>,
            WorkTool = Option<thread::JoinHandle<()>>,
        >,
    > {
        Box::new(ServerEmployee::new(ordinal_number, receipt_window))
    }
}

You're locking the mutex here

assert_eq!(10, *counter.lock().expect("counter lock fail 2"));

while concurrently trying to run the jobs in threads that also try to lock the same mutex. What you probably intended to do was to wait for the threads to finish before checking the final result, which you can do by dropping server_process_center, because its Drop impl joins all the threads.

// ....
drop(server_process_center);
assert_eq!(10, *counter.lock().expect("counter lock fail 2"));
2 Likes

Now I see the panic message in the log.
Thanks a lot!

and have one question: will the drop statement allow to wait until all the tasks are done? Or it can drop workers before they do it?

I see it does. I added a thread::sleep and got all the results.

I'll correct my self:
thread::sleep is blocking function, so I need to use async analog of it

That's right.

The drop function in this case goes through all the employees and calls join on the thread handles stored in them, which "Waits for the associated thread to finish." Looking at the code that we run in each thread, they loop until they get an error from the channel receiver. This error happens when the sender is dropped and there is no more data waiting to be received in the buffer. While the sender is dropped right away in the drop function

drop(self.shipping_window.take());

we are still able to receive the messages sent before the sender was dropped.

If the corresponding Sender has disconnected, or it disconnects while this call is blocking, this call will wake up and return Err to indicate that no more messages can ever be received on this channel. However, since channels are buffered, messages sent before the disconnect will still be properly received.

(from Receiver in std::sync::mpsc - Rust)

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.