Thread synchronization performance issues

Hi there,

I've written the following code, that works good so far.

use std::sync::{Arc, RwLock};
use std::collections::HashMap;

use chunk;

/// The RAM manager stores chunks in RAM if the chunk could not be found on RAM
/// it will pass the request to the next manager
pub struct RAMManager<M> {
    manager: M,
    chunks: Arc<RwLock<HashMap<String, Vec<u8>>>>
}

impl<M> RAMManager<M> where M: chunk::Manager + Sync + Send + 'static {
    pub fn new(manager: M) -> chunk::ChunkResult<RAMManager<M>> {
        Ok(RAMManager {
            manager: manager,
            chunks: Arc::new(RwLock::new(HashMap::new())),
        })
    }
}

impl<M> chunk::Manager for RAMManager<M> where M: chunk::Manager + Sync + Send + 'static {
    fn get_chunk<F>(&self, config: chunk::Config, callback: F)
        where F: FnOnce(chunk::ChunkResult<Vec<u8>>) + Send + 'static
    {
        trace!("Checking {} ({} - {}) in RAM", config.id, config.chunk_offset, config.chunk_offset + config.size);

        let chunks = self.chunks.clone();
        let chunk = match chunks.read().unwrap().get(&config.id) {
            Some(chunk) => Some(chunk.clone()),
            None => None
        };

        let chunks = self.chunks.clone();
        match chunk {
            Some(chunk) => {
                callback(Ok(chunk::utils::cut_chunk(chunk, config.chunk_offset, config.size)));      
            },
            None => {
                self.manager.get_chunk(config.clone(), move |result| {
                    match result {
                        Ok(chunk) => {
                            callback(Ok(chunk::utils::cut_chunk(chunk.clone(), config.chunk_offset, config.size)));

                            chunks.write().unwrap().insert(config.id.clone(), chunk);
                        },
                        Err(cause) => {
                            callback(Err(cause));
                            warn!("Could not store chunk {}", config.id);
                        }
                    };
                })
            }
        };
    }
}

You can imagine, that the get_chunk method is called in a multi-threaded environment (threadpool). So every access to self.chunks has to be synced.

I think I've made some locking mistakes, because the performance of the code is terrible. It is for streaming files and currently it only gets around 50kb/s.

I've written the same code in Go and the performance is much better, but like I already said, I think there is something wrong with my locking right now.

You can find the full programs source at: https://github.com/dweidenfeld/plexdrive/tree/rust

Thank you for any help :slight_smile:
Dom

3 Likes

Some things that come to my mind on a first look:

  1. Why does self.chunks needs to be stored in an Arc? And why do you need to clone that? As far as I can tell, the RAMManager is the only component accessing this data in the current code, in which case it could just own it.
  2. In your current design, you make a copy of the original chunk of data (which is potentially expensive if the underlying Vec is large) while holding a lock. Could this be the source of your performance issues?
  3. Moreover, you may potentially never need the whole chunk copy as you immediately pass it to cut_chunk, which extracts a subset of it. It might be a good idea to review your critical section in order to only copy what you need to copy instead (or, even more radical, to refrain from copying the chunk altogether by coming up with some shared access mechanism).
  4. If I'm not misunderstood, cut_chunk is just about taking a slice of the input data. If so, you can use Rust's built-in slices for that, which are more idiomatic and whose implementation is likely to be more tightly optimized: chunk[offset as usize..(offset+size) as usize].to_vec().
  5. Is there any particular reason why you use a callback-based design instead of merely returning the data to the caller?

These are the main things that come to my mind on a first look, but there might be other things to investigate that become more obvious when looking at the usage patterns. For example, the chunk::Config struct owns a lot of dynamically allocated data (in the form of Strings) and may thus be expensive to create and destroy, but because you move it around you can only use it once. If you need to create such structs very often (because each of them represents a request and you expect a lot of requests per second), you may benefit from reusing these structs or having them hold borrowed data (like &str) instead.

2 Likes

Thank you for your response. I will check the implementation and check how I can use more shared state.

  1. The arc is needed, because I have to clone the chunks into the clojure (move |result| {})

Maybe I can get rid of the callbacks... will have to take a look at it :wink:

If I have further question I'll come back to this topic later

But one last question... is it possible to return references.. such as:

fn get_chunk(&self, config: Config) -> ChunkResult<&[u8]>;

or do I have to use

fn get_chunk(&self, config: Config) -> ChunkResult<Vec<u8>>;

I think I start to understand why you needed callbacks and Arcs. You wanted to be able to run the callback asynchronously at some point after having returned from a Manager::get_chunk method, am I correct? I see that this is what the ThreadManager does.

If so, returning a simple Result from get_chunk won't be enough, as it would break the asynchronicity of your design by requiring that the result be available at the time where get_chunk terminates. You would need instead something which can manage an asynchronous result, such as a Future. The futures-cpupool crate provides a way to run tasks on a thread pool and return the futures of the results, if you are interested in that interface design direction.

Sticking with callbacks for now, you could of course have your callback take a reference to a slice of data:

F: FnOnce(chunk::ChunkResult<&[u8]>) + Send + 'static

However, you must then ensure that your reference is valid at the time where you invoke the callback. This is what makes this design a bit more difficult to use in a thread-safe world, as you obviously cannot return a reference to lock-protected data without holding the lock, so either...

  • You run the callback while holding the lock, and risk to hitting lock contention issues if the callback is long-running.
  • You leverage the fact that your chunks are immutable after creation, and can thus safely be accessed concurrently by multiple threads, in order to keep them accessible even when the lock is not being held.

An example of the second strategy would be to store individual chunks in Arcs so that references to them can escape the lock:

chunks: Arc<RwLock<HashMap<String, Arc<Vec<u8>>>>>

With this design, you could take a reference to an individual chunk...

        let chunk = match self.chunks.read().unwrap().get(&config.id) {
            // Cloning an Arc<Vec<u8>>, not the Vec itself!
            Some(chunk) => Some(chunk.clone()),
            None => None
        };

...and then use that reference as you see fit, for example to feed a slice into your callback

callback(Ok(&chunk[offset as usize..(offset + size) as usize]));

The trade-off here would be additional overhead when accessing small chunks, as you need to go through thread-safe reference counting for each individual chunk. If you expect your data chunks to be small, it may be faster to copy them, as reference counting only makes sense when the data is large and copying it is a performance burden.

2 Likes

The chunks are usually 4k (4096). So I don't know if they're small or large for you :wink:

The problem is that I'll often have hundreds or even thousands of requests for the same chunk.

But one chunk is actually 10MB... only after the cut it is 4096...

4k is pretty small for in-RAM data. Modern RAM runs at around 20 GB/s so you can make a copy of that in around ~200 ns. At that speed, you're likely to be bottlenecked by the overhead of locking, HashMap lookup or dynamic memory allocation before copying the chunk of data becomes a performance issue. And that's before CPU caching even comes into play.

On the other hand, 10 MB is not so small. It takes ~500 µs to make a copy of that, which across thousands of requests can easily add up to seconds of unnecessary overhead if you copy all those 10 MB only to later extract a small 4 KB fraction from it.

So with this extra data, I think it's safe to say that you can greatly speed things up just by turning this bit...

        let chunk = match self.chunks.read().unwrap().get(&config.id) {
            Some(chunk) => Some(chunk.clone()),
            None => None
        };

...into this form, where you only copy the data that you need...

        let chunk_start = config.chunk_offset as usize;
        let chunk_end = chunk_start + (config.size as usize);
        let chunk = match self.chunks.read().unwrap().get(&config.id) {
            Some(chunk) => Some(chunk[chunk_start..chunk_end].to_vec()),
            None => None
        };

...then removing the call to cut_chunk below and adjusting the branch where the lower-level manager is called in the same manner.

Can you give this a try and see if it helps?

3 Likes

This definitely increased the performance and I think I got the problem... I have to work more with references and should avoid heavy copying.

You can track the changes in the latest commit: https://github.com/dweidenfeld/plexdrive/commit/39aa4f959d6dd2d150a522dbc1b7e1014c3e9778

There is still much potential to optimization.. it currently only gets up to 1.5mb/s max at 2mb/s

2 Likes

I'd explore alternative designs as well:

  1. Chunk management on a single thread only. Callers request chunks via messages placed on a mpsc channel that the chunk manager services. Chunk manager returns a slice to the chunk as a reference over a return channel. Assuming callers keep the chunk manager alive via an Arc, returning references should work (but it's possible it won't be expressable with safe code only).

  2. Similar to #1 but shard chunk management across multiple threads, each responsible for some subset of chunks.

  3. Sticking with locking, but use a striped locking - instead of 1 big lock over all the chunks, use a lock per "segment" where a segment is some subset of the chunks. Ideally this would be spread out such that it's unlikely for 2+ threads to need chunks from the same segment.

As @HadrienG mentioned, I think you might be bottlenecked by all the memory allocation and copying. Although bandwidth on modern hardware is in the 10s of GB/s, you need to be linearly streaming through large ranges to actually get close to that rate. You should aim for zero-copy/allocation for a given loaded chunk. It's a good exercise to see difficult it is to arrange for that in Rust, safely.

3 Likes

Running everything in one thread is not the solution. I tried that before and it makes one core spin up to 100% load.
The rest of the program is bored meanwhile.

The distributed chunk storage sounds interesting...but I think it is hard to maintain.

Option 3 sounds very interesting to me. I think I will externalize the chunk storage from the manager and handle the logic in the storage unit.

Do you think using a channel through all managers instead of the callback function could increase the performance?

1 Like

Looking at your commits, here are some suggestions:

  • You only need to wrap the individual chunks in an Arc if you want to keep a reference to them outside of the lock and potentially send that reference to another thread, as I illustrated in my second post. If you intend to copy the relevant part of the chunk, as you currently do, reference-counting the original chunk is unnecessary as the copy is an independent value with its own lifetime.
  • You do not need to clone a chunk in order to cut it. You can cut the original chunk instead, avoiding one copy/allocation.
  • Consider carefully whether you really want to implicitly truncate the requested offset and size, as you do in the new version of cut_chunk. This effectively swallows cut_chunk usage errors, which you should probably report instead. By default, Rust panicks when a slice is indexed out of bounds, and if you want the application to be able to recover from invalid chunk indexing you could have cut_chunk return a Result instead.
  • In general, as @vitalyd mentioned, try to refrain from cloning data and prefer moving and borrowing whenever possible. Many of the types which you are manipulating require dynamic memory allocation to clone, which doesn't come for free, and the associated memory copies may end up being expensive as well.
1 Like

You mean you ran chunk management on one thread without locking? How did you set up the communication in that case? The 100% CPU was probably due to massive amounts of allocation and copying as I assume you were still cloning the vecs.

If callback involves locking, then channels should scale better. But as with all forms of message passing, you want to strive for a batching design to minimize communication overhead. I suppose a variation on approach #2 would be thread local chunk management with work done on those chunks also on the same thread, and on the (arranged to be) rare case where a given thread needs a chunk managed by another thread, it would request it via message passing. I agree that this is more complex.

2 Likes

Okay. Thank you for all your help thus far.

Since it is a lot code to change it will take me some time to try out the other solutions. But I'll definitely send you the final result, so that everybody can profit from it :wink:

2 Likes

One last question in this context:

If I have my Manager trait. And I pass one Manager instance across threads using Send + Sync + 'static. Will the access to the whole instance be locked when one thread accesses one method/function of the instance, or does the instance has to handle concurrency internally?

Rust does not implicitly lock Manager instances. By requiring that a Manager be Sync, you make a promise to the Rust compiler that a Manager instance can be accessed concurrently from multiple threads without external synchronization. The Manager implementation is then responsible for following this interface contract internally.

Through clever static code analysis, the Rust compiler guarantees that if you don't use unsafe code (e.g. unsafe impl Sync for Manager), the Send + Sync + 'static bound on the Manager trait will automatically ensure that Manager implementations meet this thread-safety requirement.

2 Likes

As an aside, it's still somewhat unclear to me what precisely you must guarantee when being Sync. Specifically, with regards to memory ordering. Can relaxed loads/stores be used? Acquire/release? Or is sequential consistency needed? A given type may be Sync in many ways in this regard - it's safe internally but you can't piggy-back memory ordering effects off it, perhaps. Or it may be fully sequentially consistent. In some ways, the AtomicXXX types demonstrate this - they allow specifying memory order per operation. That makes sense for them because they're low level atomic building blocks. It's unclear to me how this extends to higher level APIs. Does a Sync type need to know how it's going to be used in all contexts and offer different ordering guarantees? If not, is it supposed to indicate via rustdoc what ordering it uses so callers can know if they need to enforce stronger order (or not).

It seems like Sync is too coarse for the big picture scenarios. But maybe I'm missing something.

2 Likes

I think Rust's Sync trait is best understood in the linearizable model of concurrent computation. Namely, a Sync object should, from the point of view of external observers, move from one consistent state to another in an atomic and transactional fashion, and never be observable in an intermediary inconsistent state that breaks one of its class invariants.

Using low-level atomic building blocks, linearizable transactions can be built by designing them a sequence of invisible non-atomic operations (or, occasionally, Relaxed atomic operations) which are all made simultaneously visible by the producer via a Release operation, and observed via Acquire or Consume operations on the consumer side.

What do you think about this view?

2 Likes

I think that model works fine when describing solely that object's internal state. What's unclear to me, however, is what effects a caller/user of that object can expect. For instance, a given Sync object may use relaxed loads (or stores), and still uphold its contract. But, what can I, as the user of said object, expect about my own loads/stores that come before or after that object? For instance, if the object uses a relaxed load, I cannot assume that my loads that come after it, in program order, are actually ordered after the relaxed load - that relaxed load itself is atomic and transactional, but it says nothing about surrounding loads/stores.

Similarly, if the object uses a releasing store in one of its functions, should it say that in its rustdoc? Because the fact it's release, and not e.g. a sequentially consistent operation, has implications on what I do to protect the order of my surrounding operations. In a lot of places, you want to piggy-back off existing atomic operations for their memory effects rather than sprinkling additional fences.

Does that make sense?

2 Likes

I see. What you are worried about is interactions between multiple concurrent data structures, or between concurrent data structures and unsynchronized code, am I correct?

I tend to approach this issue with the rather extremist view that if a single thread needs to care about the relative order of two transactions on entirely unrelated pieces of data (without e.g. a data dependency between the transactions guaranteeing the order of execution in hardware), it's a bug waiting to happen, and should be fixed by reworking the synchronization protocol so that the two transactions are grouped into one. But that's not always a practical viewpoint.

In general, we always end up hitting the ages old issue that concurrent data structures, much like multi-threaded execution engines, do not compose very well with each other.

When merging the transactions is not practical, the short-term fix, in my opinion, is to add appropriate fences where you need them, and only come back at the code if you really have a performance problem. While a longer-term fix would be to improve hardware and runtime support for composable software-defined atomic transactions (think e.g. hardware transactional memory). But the later is certainly wishful thinking at this point, considering how much time it took even Intel to produce hardware transactions that work at all, without saying anything about the efficiency of the mechanism or the complexity of the transactions that can be expressed in hardware.

Going off on a tangent, aren't modern optimizing compilers smart enough to merge together related memory fences? (e.g. two acquire/release fences in a row)

2 Likes

Yup. In a language with a well-defined memory model, like Java, the memory effects of operations that a class exposes are usually described in the javadoc (rustdoc equivalent). There's no "blanket" Sync, so to speak, that effectively says "it's safe to share across threads". A class explicitly documents whether it's safe to access concurrently and what memory guarantees it offers via those operations. The fact that the class itself transitions atomically and transactionally (or put more generally - maintains its own invariants) is pretty much implied. The interesting bits are what non-threadsafe code that's using that type can assume about its own operations. The "Memory Consistency Properties" section is a good example of the type of documentation they provide. This is important because absent those guarantees the caller pretty much cannot assume anything about memory ordering that occurs as a result of using the threadsafe API. As such, they would need to insert their own fences. This can be problematic performance-wise because at enough layers of abstraction you can end up with multiple fences sprinkled very close to each other, and in hot paths that can have significant slowdown effects.

So Sync in Rust, being an OIBIT auto-derived trait, seems a bit scary. You may concoct a Sync derived impl purely because its built of other Sync components, but they may not interact properly with each other - they're only consistent within themselves. So the Sync ends up being a false sense of "security", and the author may not even realize that their components don't interact properly - the Sync just falls out invisibly.

Having said that, I guess I have a slight discomfort with Sync being an OIBIT - I get the rationale, but I think it's dangerous.

2 Likes

They can, to some degree - the stars have to align up properly though :slight_smile:. I think literally back-to-back will get eliminated (assuming inlining happens and all that fun stuff, which may not be the case across layers of code). But it's hard to rely on that.

On this topic, a couple of interesting links:
http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4455.html

2 Likes