Help with data structure for synchronization

Hello,
I've stumbled upon the need for the following synchronization logic:
I'm keeping track of a series of ordered 'tasks' (T + Ord) with unknown time of completion. I need a thread-safe structure that lets me 'block' an execution context until all tasks in the queue up to a certain point have finished.

This is my current implementation of such a structure.

First of all, I'm honestly astonished at Rust's existing synchronization primitives and the ease with which I was able to write such a structure and for it to compile and sort of work on the first run!
However, I'm unhappy with my removal method queue.retain(|x| !ptr::eq(x, pointer)); and I think there's a better way to approach this.

My project relies very heavily on parallelism and I believe my current performance is bottlenecked by IO. Because of this I'll be migrating it to Tokio. My question is, does tokio provide a similar abstraction for my needs or could it be done in a much cleaner/faster way?

I gave this a go myself and have an (I believe) improved version (playground). It builds on its own kind of primitive, which I called Client + Watcher. (The Client type name comes from you.)

On the topic of porting this to async: tokio does not seem to have a Condvar but I’m positive that building the same kind of Client + Watcher primitive should be possible anyways.

I also think your version might have had bugs in particular

  • it doesn’t pass two of my tests that my version does pass (see in the playground)
  • multiple waiters can call channel.recv() multiple times while only one receive can succeed
  • There is a race condition where after drop(queue) and before re-locking it, the queue could get resized, changing the pointers to (i.e. the memory locations of) all the queue elements, so the .retain might fail to remove the element.
    Edit: Even worse, the pointer to the highest element will always be the pointer to the first element, so your .retain always removes the first element (unless the heap was resized as described in the previous sentence).
2 Likes

Thank you so much! I didn't mention that my structure broke in one of my project tests and that was exactly because I couldn't figure out how to do multiple .recv()'s on the same channel for the same message. And I also couldn't figure out how to pop the queue properly. This is some really beautiful code and it will take me some time to fully wrap my head around it but I definitely didn't know a lot of these things were possible. Thank you!