Thread pool access

My functions for compressing and de-compressing are now working quite nicely ( here is a comparison withe the flate2 crate, it compresses quite a bit faster, and the result is even smaller ):

flate2 compressed size=149398
flate2 test completed ok, n=100 time elapsed=1353 milli sec.
flate3 compressed size=146215
flate3 test completed ok, n=100 time elapsed=963 milli sec.

so I have started thinking about how they might be packaged/crated/published or whatever the right term might be. One issue I am not quite sure how to handle is how to handle the thread pool the compress function is using. Currently it is a parameter of the compress call:

use scoped_threadpool::Pool;

pub fn compress( inp: &[u8], p: &mut Pool ) -> Vec<u8>
{...}

but this seems "not quite right" to me, in the sense that the caller of the compress function really shouldn't be bothered with the internal workings of the compress function. What alternatives could I consider? Is there a common way to tackle this issue?

The easiest solution would be to make a Compressor struct that owns a thread pool, and let those threads lay idle while nothing is being actively compressed.

As an alternative, you could use something like OnceCell to have a static Mutex<Pool> that all compress calls use, and they sit in a queue if called concurrently.

Either of these approaches could also have an alternative entry point if the caller already has a a thread pool they wat you to use.

1 Like

I have a couple of thoughts to share. The first is that the code is currently structured as an executable. It's almost trivial to turn it into a library (with an associated executable, if you like... or even just an example). The Cargo documentation has plenty of info on project organization like this.

The second thing is that your code has a lot of public types, which exposes some of the internals. A clean interface will provide visibility of only the bare minimum surface area to work with the API effectively. (This concept is usually described as information hiding, encapsulation, or abstraction.)

And to more directly address your question, I would personally design the library to offer a type that manages implementation details (like ownership of the threadpool), and implement methods on that type to provide functionality. Something like this:

pub struct Flate3 {
    Pool: pool,
    // TODO: Move channels for matches and checksums here
}

impl Flate3 {
    pub fn new() -> Self {
        let threads = u32::try_from(num_cpus::get()).unwrap();

        Self {
            pool: Pool::new(threads),
        }
    }

    pub fn compress(&mut self, inp: &[u8]) -> Vec<u8> {
        // Do something with `inp` and `self.pool`
    }

    pub fn inflate(&self, data: &[u8]) -> Vec<u8> {
        // ...
    }
}

#[cfg(test)]
mod test {
    fn test_compress() {
        let mut ctx = Flate3::new();
        assert_eq!(&ctx.compress(&[..]), &[..]);
    }
}

Pretty much everything else should be private. Unless of course you plan to expose a streaming API like the flate2 crate's GzEncoder.

3 Likes

Yes, it's clearly "under construction" at the moment. I am thinking about simply combining all the source files into a single lib.rs, but not too sure if that's the right way to go. When looking at unfamiliar libraries, I find it frustrating if you have to keep opening lots of different files to try and find the source file you are interested in. On the other hand, perhaps "one struct, one source file" is a good rule.

I don't think the channels would be re-used though ( is that even possible? ).

I guess the thread pool issue arises because creating a thread is such an expensive operation that you need a pool for performance reasons, even though logically the threads could just be created "on-demand" in a perfect world. Wrapping the pool in a struct does seem to be a good idea though I agree. it would allow a different thread pool to be substituted without breaking the interface.

As a footnote, I have now published a crate which I think incorporates the suggestions above:

https://crates.io/crates/flate3

The compress example now looks like this:

let mut comp = flate3::Compressor::new();
let data = [ 1,2,3,4,1,2,3 ];
let cb : Vec<u8> = comp.deflate( &data );
println!( "compressed size={}", cb.len() );
1 Like

Do you really need exclusive access to the thread pool? Doesn't that mean, you block other threads from progressing as long as you're inside the compress function? I'd expect Pool to contain an UnsafeCell, i.e. you would be able to mutate it through a shared reference via some runtime locking method.

I was wondering that, because I wanted to suggest enhancing the solution of @parasyte by using Cow<'a, Pool> in the struct and implementing From<&'a Pool> for it, i.e. the compress method would work for both an existing thread pool as well as without one. However, if you need exclusive access to the thread pool, then Cow would clone on write, which is not what you want. I'm generally uncertain about Pool supporting Cow.

The scoped method of scoped_threadpool takes &mut self, and the documentation example has mut pool. So yes, I think so.

Well, depends a bit what you mean by "other threads" I think. I think the threads in the scoped thread pool can only be used for compression, when compression is active, but there can be many other threads in the system. It does seem like the potential for sharing threads is slightly limited, I would agree. ( My grasp of all of this is weak, it has to be said ).

1 Like

I just looked at the API and the thread pool really does seem to be limited to handle 1 task with multiple jobs at a time. That means you cannot have a threaded application and reuse the threads for your compression while the rest of your application is still running. I assume that is an inherent limitation to scoped threadpools and there's not much you can do about it outside rewriting the code for some non-scoped threadpool, which will be harder to accomplish.

Anyway, your current API could be improved by implementing From<Pool> for Compressor and From<Compressor> for Pool, i.e. enabling re-use of the scoped threadpool for those who need it for other things, too.