CpuPool for non-static types vs scoped-pool

At a high-level, I'm trying to read and deserialize records from a file. Doing this serially is really slow, and so I'd like to do it in parallel. I have tried to create a minimal set of reproducing code, but couldn't get it working in the Rust Playground, so forgive me for links to my project in GitHub.

My first attempt was using CpuPool via futures: https://github.com/wspeirs/logstore/blob/thread_pool/src/data_manager.rs#L79

When I try to compile this, I get the following error:

the type `[closure@src/data_manager.rs:93:44: 95:14 self:&mut data_manager::DataManager, loc:u64]` does not fulfill the required lifetime

I think I get this... basically self doesn't live forever ('static) and because I'm passing it to a thread, that could live forever, that's a problem. However, I haven't a clue how to fix this. I've tried wrapping self in Rc<RefCell<>> combo and Arc<Mutext<>>, neither seem to work.

Next I tried using scoped-pool, as it addresses my exact issue. Using this approach, I run into borrowing issues: https://github.com/wspeirs/logstore/blob/thread_pool/src/data_manager.rs#L108

error[E0504]: cannot move `self` into closure because it is borrowed
error[E0597]: `self_clone` does not live long enough
error[E0597]: `loc` does not live long enough

I feel like I'm doing something fundamentally wrong here. I wouldn't think it'd be that hard to issue multiple calls to LogFile::get(...) and capture their results. How should I go about doing this? Thanks!

You need to add a move to the closure you give to execute - that will move the clones into the closure so it doesn’t borrow anything from the environment.

A typical approach for such cases is to either put the shared state into an Arc<Mutex<...>> and then pass clones of that around or (a more "extreme" case of the same thing) put all the internals of a struct into an inner struct that’s encapsulated with an Arc<Mutex<...>>; for example:

#[derive(Clone)]
pub struct MainStruct {
    inner: Arc<Mutex<Inner>>,
}

struct Inner {
   // all the state goes here
}

Now your MainStruct can be cloned and the clones can be moved across threads and do mutation through the mutex.

I added the move as you suggested, and also refactored things to try and have fewer reliances on self (namely by creating the scoped_pool locally), but I'm still getting borrowing issues I don't understand how to fix: https://github.com/wspeirs/logstore/blob/thread_pool/src/data_manager.rs#L109

error[E0507]: cannot move out of borrowed content
   --> src/data_manager.rs:141:17
    |
141 |         let r = ret_vec.into_inner().unwrap();
    |                 ^^^^^^^ cannot move out of borrowed content

error[E0597]: `self_clone` does not live long enough
   --> src/data_manager.rs:135:17
    |
131 |                     match self_clone.lock().unwrap().log_file.get(loc) {
    |                           ---------- borrow occurs here
...
135 |                 });
    |                 ^ `self_clone` dropped here while still borrowed
    |
    = note: values in a scope are dropped in the opposite order they are created

Again, am I doing something fundamentally wrong? Is using a scoped_threadpool the right way to go here? All the examples I see online use a channel, and I'm wondering if that's what I'm missing trying to share the state of the Vec I want to eventually return. Thanks again for the help, and any further guidance is greatly appreciated!

This is because you have the Mutex inside an Arc, which means you borrow the mutex from it. You’d need to first take the mutex out of the Arc via something like Arc::try_unwrap.

I’ve not used the scoped pool before but presumably it should allow you to avoid Arc entirely if the values the scoped threads use outlive the 'scope lifetime. So for instance you should be able to share a &Mutex across the threads in the scope and not need the enclosing Arc. Once the threads are done mutating the Vec, you can take it out of the Mutex.

And yes, channels are the more usual way to do this. You’d create a new channel, give each worker thread a cloned Sender, and retain the Receiver part in the main thread (or the thread that’s spawning the work onto the pool). The workers would send values over the channel, and the receiver would collect them into a local Vec (or whatever). This wouldn’t require any Mutex.

The main benefit of scoped threads (or pools using scopes) is the closures running on that pool can reference data in the stack of the thread that spawns the work. In other words, you don’t need Arc to keep things alive (or to satisfy 'static lifetime requirements).

Thank you again for all your help... but I'm still not getting this :frowning:

https://github.com/wspeirs/logstore/blob/thread_pool/src/data_manager.rs#L90

I removed the Arc from the Vec and simply call lock() on the inner execute. However, now it complains that my ret cannot be moved out of borrowed content. I assume this is because the closures are borrowing ret?

I still facing the same issue with self_clone being dropped while borrowed.

Clearly I'm new to all of this, and not groking it. I think I have a general idea of what's going wrong, just no clue how to fix it... is there anywhere I can read/learn more about this? I'm happy to put the time in, but I just feel like I'm searching in the dark at this point randomly trying things.

error[E0507]: cannot move out of borrowed content
   --> src/data_manager.rs:108:12
    |
108 |         Ok(ret.into_inner().unwrap())
    |            ^^^ cannot move out of borrowed content

error[E0597]: `self_clone` does not live long enough
   --> src/data_manager.rs:104:17
    |
100 |                     match self_clone.lock().unwrap().log_file.get(loc) {
    |                           ---------- borrow occurs here
...
104 |                 });
    |                 ^ `self_clone` dropped here while still borrowed
    |
    = note: values in a scope are dropped in the opposite order they are created

Not meaning to discourage you but... even if you get this code to work, I do not understand how it could be any faster than the single-threaded version in its current form, given that the tasks which you hand over to the thread pool spend most of their time holding a mutex over shared data (the DataManager and the output Vec). This prevents parallel code execution.

I think that in order to really leverage parallelism, you may need to redesign your log_file for concurrent access (so that you do not need an &mut self to call the get() methods of LogFile and DataManager, but only an &self). You will then be able to get rid of your Mutexes and streamline the parallel for loop on locations and subsequent collection of results into a Vec by delegating all that work to Rayon.

The main issue which I expect to get in the way is the usage of a cache in LogFile. Caching and concurrent access do not like each other so much in general. One possible strategy, which may or may not be fast enough, is to mutex-protect the LogFile's LRU cache, and make sure that you only lock the mutex when you are trying to read from the cache at the beginning of LogFile::get(), and when you are trying to insert data into the cache in the end, leaving the intermediate computation free to execute in parallel. The drawback is that sometimes, you will carry out some work multiple times instead of leveraging your cache, but as I said, caching and parallelism are sometimes somewhat antagonistic...

2 Likes

This is because you have:

let ret = &Mutex::new(...);

Note how ret is a reference, and not an owned value. The above is roughly equivalent to:

let ret = Mutex::new(...);
let ret = &ret;

You want to have the first ret there: an owned Mutex, not a reference to one. You only want the closure to have a reference, but the outer scope should have the value. If you have the value, you can call into_inner on it. into_inner consumes the mutex so you must be the owner.

The second issue is due to temporaries and their lifetimes. You should write the code there as:

let me = self_clone.lock().unwrap();
match me.log_file.get() {
...
}

I’m on mobile so somewhat brief. If you have questions, ask and I’ll try to respond a bit more in depth later.

@HadrienG brings up good points about the scalability but let’s solve the ownership issue so you have that working and then can discuss the design.

@vitalyd, thank you again for all of the help... very much appreciate it! I was able to get it compile:

https://github.com/wspeirs/logstore/blob/thread_pool/src/data_manager.rs#L90

Clearly this is the "wrong" way to go. I had initially created ret as &Mutex::new(...); because I would get borrow errors when I didn't. My understanding is that when I have scope.execute(move || { ... }) any local variables that are used in the closure are moved into the closure. Therefore, my ret was being moved into the closure, and so the compiler wouldn't let me use it during the return Ok(ret.into_inner().unwrap()). I "solved" this by creating two variables and then wrapping the reference to ret in a scope, so it's borrow would fall out of scope by the time I needed to use ret during the return. There has to be a better way to go about this though!

@HadrienG you're not discouraging at all... I'm very new to Rust and I'd never heard of Rayon before. I only recently added the cache to LogFile to try and speed it up. I would think I could use a RwLock around the cache: RwLock in std::sync - Rust I also re-worked RecordFile's read_at() to use pread so I could handle multiple threads all reading from different locations. Thanks again for the pointers.

Pretty clear to me that Rayon was/is the way to go here:

https://github.com/wspeirs/logstore/blob/thread_pool/src/data_manager.rs#L84

In 1 line I'm able to get working code that was taking me 10s of lines. On top of that, the results are pretty solid:

Serial Gets

PT8.516698762S for 100000 inserts
PT5.133184714S time for 1 get
PT5.103475168S time for 1 get
PT5.587023055S time for 1 get
PT5.103813894S time for 1 get
PT5.075737713S time for 1 get
PT26.003305362S for 100 gets

Parallel Gets

PT8.708934495S for 100000 inserts
PT1.559206563S time for 1 get
PT1.558827227S time for 1 get
PT1.573699968S time for 1 get
PT1.552675100S time for 1 get
PT1.565661282S time for 1 get
PT7.810134375S for 100 gets

So @HadrienG, thank you again for the Rayon pointer! @vitalyd, thank you very much for all your threading help. While I "have it working", I would be curious to see how I could have accomplished the same thing without Rayon. Thanks again!

That’s correct.

Also correct.

Right now that’s pretty much the way to go with code like this. Once non lexical lifetimes are in stable (they’re available in the nightly channel, ie compiler) then you shouldn’t need to play with explicit scopes like this.

Also, creating an explicit reference that you then use in a move closure is the way to move a reference in there (assuming your closure is a move one - not all have to be). The fairly implicit way closures capture their environment (and then how move influences this) takes a bit getting used to. This is quite different from say C++ where you have an explicit capture list and you can specify the type of capture (value, ref, move) per captured item. However, the upside of the Rust way is the compiler will not let you violate memory safety. It’ll complain instead and make you scratch your head a bit (until you get the hang of it), but at least it won’t let unsoundness through.

Rayon is very awesome! It’ll be hard to beat it with a regular threadpool because rayon’s pool is a workstealing one so you’ll get better utilization if you give it enough work. You have a (I believe) disk IO workload so you’ll probably put the threads to sleep while the IO is serviced - rayon won’t really help with this part. The crucial thing to do with such a workload is to ensure you don’t hold a mutex while the IO is pending or else every other thread will synchronize on this as well rather than make progress (if their IO is done).

But the way I’d do this (without rayon and without hyper optimizing) is probably with channels, rather than a mutexed Vec, spread over a normal thread pool.

@vitalyd, well thanks again for all the help. I clearly need to read-up on channels, threads and Mutexes in Rust.

I took one last stab at making this super-efficient by moving the cache from LogFile to DataManager: https://github.com/wspeirs/logstore/blob/thread_pool/src/data_manager.rs#L74

It all works, and I used some of the "tricks" I learned here today; however, it's not really much/any faster:

With Cache

PT8.482401779S for 100000 inserts
ON DISK: 100000
PT2.632130549S time for 1 get
ON DISK: 0
PT1.230524226S time for 1 get
ON DISK: 0
PT1.231673497S time for 1 get
ON DISK: 0
PT1.228589565S time for 1 get
ON DISK: 0
PT1.226049220S time for 1 get
PT7.549040479S for 100 gets

The extra logging shows how many records I'm having to go to disk for. After the first one, they're all in the cache, but it's still taking 1.2s to fetch all 100K records. I'm thinking this has to do with the copying I'm "forced" to do:

98            cache.borrow_mut().insert(loc, log.clone());
...
104           cache.borrow_mut().get_mut(&loc).unwrap().clone()

I don't of any ways around these copies, but curious if anyone has thoughts. I could also be wrong, and it might have to do with iterating over the list of locations to partition it, then again to fetch them from the cache, but I wouldn't think that's too slow.

I guess the question is, beside putting println! statements with timing, is there a better way to do profiling in Rust to find out exactly what's slow?

Thanks again!

What types of values are you cloning there? I see LogValue is an enum but which variants are you ending up with?

One option is to put the LogValue into an Arc and then hand out clones of that, which will be an atomic refcount bump. If you’re cloning Vecs and the like, then this ought to be quicker.

For profiling - which OS are you on? On Linux you can try perf. On MacOS I think Instruments can profile Rust code but I’m not certain.

What types of values are you cloning there?

cache.borrow_mut().insert(loc, log.clone()); - That is cloning a HashMap<String, LogValue>. For this particular test the LogValue's variant is String, and the contents is simply "localhost". I have to do this because the LruCache wants ownership of log, which seems reasonable and obvious.

cache.borrow_mut().get_mut(&loc).unwrap().clone() - That is copying a HashMap<String, LogValue> as well. It too will be the String variant. I'm having to do this clone because otherwise the type is &mut HashMap<String, LogValue>. So I'm really using clone() here to modify the type... which just feels wrong, but I'm not sure how to convert from &mut HashMap<_, _> to HashMap<_, _>.

Which OS are you on?

I'll check out perf... I didn't realize/think it would work with Rust; good to know! Thanks again!!!

Oh I didn’t realize you’re cloning the HashMap too. You can’t turn a &mut HashMap<...> into a HashMap<...> because of ownership - this is saying you’d want to take ownership of a mutably borrowed map.

What you can do, however, is share ownership - that’s done with Rc or Arc, depending on threading needs. If the map is never modified after being placed into the cache, then a simple Arc over the whole map will allow you to share it across threads with a single atomic refcount bump. That will certainly be cheaper than cloning the map and its contents.

@vitalyd, thanks again for all the help! In the end, I just removed the cache as I don't think it'll really help me all the much. The OS should be caching pages, which will be just about as fast what I'm doing now. I will check out perf though see where I'm spending all my time at some point. Thanks again!

1 Like

Yeah, no problem. Linux will cache the pages but you’ll still take a syscall hit to read it and cache expiry/management policy is out of your hands - maybe not an issue though and you avoid the double caching (there are ways to avoid this but that’s a separate topic).