Questions about tokio/futures and RwLock


#1

Hi,
I’m just a hobby programmer coming from garbage collected programming languages like javascript etc, and I’m stating to wrap my head around how to handle shared mutable state. I’m working on a server using tokio and I would really appreciate if anyone could take their time to answer some questions I have and see if I’ve understood how to use smart pointers in tokio correctly. :slight_smile:

Basically my situation is that I have a struct that hold information that some futures will read from and a few futures will write to. Using a RwLock seems approipriate, I think. However, if I understand tokio correctly using the blocking .read() or .write() on the RwLock would halt the entire program if any other future holds a (writing) lock, unless I spawn futures on different threads. Right?

So the solution would be to spawn a future that tries to read/write on the RwLock by using the try_read() and try_write() methods and if a lock can’t be acquired, the future should return Ok(Async::NotReady). This will tell the event loop to leave this future and come back to it later, maybe after executing some other futures that have been spawned unto the event loop. (Am I still correct?)

Now there is one thing that’s worrying me. That is, if I try to acquire a write-lock on my RwLock, but someone else is reading it, the future trying to get the lock will have to wait. But what happens if during this time, another future gets a read lock before the first one dropped the lock and then another one gets a write lock before the second one drops it and so on… I know this might be rare, but I expect many readers. Could this mean that a writer could have to wait indefinatly to get a lock?

The solution I can think of is to make a struct wrapping the RwLock and having a (atomic?) bool that indicates if a writer is waiting. If a writer is waiting, no new read locks can be acquired. Once the write lock has been given and then dropped the bool goes back to false. Is this necessary? Or is this maybe how the RwLock already works?

Wow, that was a lot of thoughts and questions… :slight_smile: If you have an answer for any of my questions, please don’t hesitate and help me out, any help would be greatly appreciated!

P.S. Do you think we can expect the futures::sync module to get a RwLock and more tools in the (near) future?


#2

So, the problem with returning Async::NotReady is that the event loop will not know about your blocking call (right, you have to know a little about epoll in order to use tokio. I guess a first approximation is that you’re only allowed to return Async::NotReady when you’ve hit an error of type std::io::Error of kind std::io::ErrorKind::WouldBlock, or an Async::NotReady from another crate).

Your problem seems tricky. One thing you could do is to have one future spawned onto the event loop of your threads, that would continually wait on an futures::mpsc channel, modify the value if necessary, and return it else. Then, the event loops of your other threads can send and receive messages to/from it.

See https://docs.rs/futures/0.1.14/futures/sync/mpsc/index.html


#3

So is it just futures spawned on the same event loop that are reading/writing to the struct or is that struct shared with other threads (or across multiple event loops)?


#4

Thanks for the idea pmeunier! If I understand it correctly, the value is still behind a RwLock but it’s only written to in a future that listens to a mpsc channel and then the sender to the channel can be handed out to different futures or threads freely. And the benefit would be that only the writer-future can block, so I can just spawn it on another thread. Right?

But the thread doesn’t block when using https://doc.rust-lang.org/std/sync/struct.RwLock.html#method.try_write, right? Couldn’t I return Async::NotReady if I use the try_write?

At the moment I do it all on the same thread and the same event loop.


#5

So then you don’t need any locking at all, right? :slight_smile:


#6

I don’t know… how would I do it? Doesn’t eventloops require process synchronization even if all the futures are executed on the same thread?


#7

You’d wrap the mutable data in a Cell or RefCell. This thread may help as well: Ownership issues with tokio and multiple types of futures


#8

Thanks for the link! I guess Cell and RefCell makes sense, I’ll try it out later, but I fear I might have the same issue with having to “lock” or borrow it. Isn’t a RefCell just the same as a RwLock but for one thread?


#9

No, and that’s the whole point! Since a single thread will handle it, it can be on the stack as a normal variable.

You can, but the event loop won’t be aware of what caused the blocking, and won’t poll your future again. So, it won’t do what you want.


#10

I read some documentation on futures and with your comment now it makes sense. Thanks!

I’m sorry, but I can’t figure it out. Should I create the channels:

let (tx, rx) = unbounded::<String>();

Then listen on the channel on another thread:

cpu_pool.spawn_fn(move || {
rx.for_each(move || {
// ...
});
});

But what should I do with the actual variable? And how do I read it?


#11

You could try something like:

    let (sender, receiver) = futures::sync::mpsc::unbounded();
    let t = std::thread::spawn(move || {
        use futures::Stream;
        let mut core = Core::new().unwrap();
        let mut shared_variable = 0;
        core.run(receiver.for_each(|(msg, oneshot): (_, futures::sync::oneshot::Sender<()>)| {
            match msg {
                 Msg::Increment => {
                       shared_variable += 1,
                       oneshot.send(None).unwrap()
                 }
                 Msg::Read => {
                       oneshot.send(Some(shared_variable)).unwrap()
                 }
            }
        })
   })

And write to sender from the other threads (you can clone sender).


#12

Wow, thanks a lot! I’ll try it out, looks very nice! :slight_smile:


#13

I see @pmeunier has given you some great suggestions/pointers, but wanted to answer your RefCell question.

Yes, it’s like a singlethreaded lock but without the overhead of atomic operations. If the approach @pmeunier suggested works, then you should use that - the RefCell is “dangerous” because if you violate the borrowing rules with it at runtime, you’ll get a panic. Always prefer to use compiler’s static borrow checking.


#14

I don’t think it’s possible to hold the lock for more than one “tick” of the event loop safely, as that would require storing a lock guard inside the Future.


#15

Great suggestion! Indeed, RefCell is a nice way to share data between several futures in the same event loop (i.e. on the same thread). You can have a shared buffer to avoid allocations, for instance.
You know that as long as you don’t do any IO, your future is not going to be interrupted, contrarily to what happens with threads.

Futures makes it hard to miss IO, since you always have to chain it with something.


#16

That’s a great point! Definatly simplifies it all, guess I overcomplicated it…

Yeah, I see now how RefCells are a natural solution in most cases and I think I’ll use it for one part of my code. For another part where I need the “result” to be sent much after the invokation of the function I’ll use @pmeunier solution where it fits perfectly.

Thank you so much for all the help! I think I could’ve never figured this out myself (especially pmeuniers solution)!


#17

@4lbert I’ve had the same issue in another crate I’ve been building. My solution was to build a futures-aware mutex that registers interest in a lock and can notify a future when the lock is available. I’ve turned it into a crate called futures-mutex. Hopefully this can help you.


#18

Great library! Thanks for sharing! :slight_smile:


#19

For a newer alternative, try https://crates.io/crates/futures-locks .