[SOLVED] Scoped threadpool and unused generics

Hey there,

I have a problem using a scoped thread pool.
As you can see in my tests I add some threads by the add() method to the pool and printing some lines out. I expected some output like
"1, 1.1, 2.1, 3.1, 2, 3" but it is always "1, 1.1, 2, 2.1, 3, 3.1"

The second problem is the generic type "T". I need a type T for Fn(T), but I do not need it in my struct... how can I avoid specifying a placeholder in my struct for type T? (If I do not add the placeholder the there is always the issue: parameter "T" is never used)

#![feature(unboxed_closures)]

extern crate scoped_threadpool;

use scoped_threadpool::Pool;

pub struct Worker<T: Send, F: Fn(T) + Send + Sync> {
    pool: Pool,
    callback: F,
    placeholder: Option<T>,
}

impl<T: Send, F: Fn(T) + Send + Sync> Worker<T, F> {
    pub fn new(callback: F) -> Self {
        Worker {
            pool: Pool::new(1),
            callback: callback,
            placeholder: None,
        }
    }

    pub fn add(&mut self, item: T) {
        let callback = &self.callback;
        self.pool.scoped(|scope| {
            scope.execute(move || {
                callback(item);
            });
        });
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::time;

    #[test]
    fn test_add() {
        let mut w = Worker::new(|item| {
            thread::sleep(time::Duration::from_millis(1000));
            println!("{:?}", item);
        });
        w.add("1".to_owned());
        println!("1.1");
        w.add("2".to_owned());
        println!("2.1");
        w.add("3".to_owned());
        println!("3.1");
    }
}

appreciate your help :slight_smile:

// Update
Okay thank you for your help. I understand the problem and was able to fix it. Nevertheless I decided to follow a different approach

use std::thread;
use std::sync::mpsc::{channel, Sender};

pub struct Worker<T: Send + 'static> {
    sender: Sender<T>,
}

impl<T: Send + 'static> Worker<T> {
    pub fn new<F: Fn(T) + Send + 'static>(callback: F) -> Self {
        let (itx, irx) = channel();
        thread::spawn(move || {
            loop {
                match irx.recv() {
                    Ok(res) => {
                        callback(res);
                    }
                    Err(_) => (), // should not be handled because.... it sucks
                }
            }
        });

        Worker { sender: itx }
    }

    pub fn add(&self, item: T) {
        self.sender.send(item).unwrap();
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_worker() {
        let w = Worker::<String>::new(|r| {
            println!("Wohoo: {:?}", r.clone());
        });

        w.add("1".to_owned());
        println!("1.1");
        w.add("2".to_owned());
        println!("2.1");
        w.add("3".to_owned());
        println!("3.1");
        w.add("4".to_owned());
        println!("4.1");
    }
}

The scoped APIs are designed so that the thread is joined when the scope ends. Which means that in your example, everything run in series. (Without that, you couldn't borrow from the enclosing stack frame, which is what the APIs are meant for.) You probably want to use pool.spawn() here.

Instead of a Option placeholder you can use std::marker::PhantomData which is a zero-sized type, so it doesn't take up space. The PhantomData docs say you should use PhantomData<*const T> if you don't actually own a value of type T.

1 Like

This method will block until the closure and all its jobs have run to completion.

Because you're only calling execute once inside each call to scoped, you don't actually ever have two threads running at the same time. This is an inherent tradeoff with scoped threads; in order to prevent any objects that item holds references to from going out of scope while the thread is running, the thread must end before the scope can.

If you want to run several threads at once, either move scoped outside of the loop and call execute repeatedly on the same scope, or use a non-scoped threadpool — I suspect your data might be 'static, and there are fewer moving parts that way.