My future eats all the CPU, how can I prevent this?

Hi,

I'm trying to make a tokio based library for the xi editor. It's basically json-rpc, but the enpoints need to behave both as clients (sending requests/notifications) and as servers (handling incoming requests/notifications). I have an Endpoint struct that implements Future like this (the actual code is here):

impl<S, T: AsyncRead + AsyncWrite> Future for Endpoint<S, T>
where
    S: Service,
{
    type Item = ();
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        // do some non-blocking stuff, mostly polling streams and channels, and spawning futures
        // [...]

        // notify the reactor to reschedule current the endpoint for polling
        futures::task::current().notify();
        Ok(Async::NotReady)
    }
}

The problem with this code is that the CPU jumps to 100% because the future keep being polled.
I could just add a sleep (but what would a good value be?), but it does not feel right: if the endpoint has a lot of work to do, I probably don't want it to sleep.

Is there a better strategy?

What is the reason for using notify here? It's only needed if you can't register interest in the event (in which case busy-waiting would be inevitable). If you are using Tokio channels or Tokio I/O functions, then they will automatically register interest in the events for you (behind the scenes).

I'm not sure actually, because I don't fully understand what it does.
As far as I understand though, it tells the event loop to poll again this future later, which is what the (now deprecated) unpark() was doing. Is that correct?

This line comes from one of my other crates, which is very similar (messagepack-rpc). It's was necessary because tokio stops polling the future after a 9 times exactly. I created a branch which illustrates the problem.

git clone https://github.com/little-dude/rmp-rpc
cd rmp-rpc
git checkout bug
RUST_LOG=rmp_rpc=info cargo run --example ping_pong 

How many clients did you test the ping-pong example with? The issue might be that the Arc<Mutex> you're using to hold the pong counter isn't able to let the event loop know when it is ready to be locked if another copy of the lock is locking the value. Mutex isn't a future-aware concurrency type, so that lock will likely spin waiting for an unlock that doesn't know how to get your future run.

Also what fylwind is saying is that streams, channels and futures all have in-built mechanics to register interest in a change event from some relevant piece of data. That means also that your Endpoint future automatically gets re-polled when the stream and channels it relies on have an event happen on them. They take a handle from futures::task::current(),which holds your future, and will call notify() when some relevant event happens inside them.

Edit: the tokio.rs site explains how this event model works.

1 Like

That's what it does, yes. Generally, it's the responsibility of the "listening" functions (e.g. tokio_io::io::read, or futures::sync::mpsc::Receiver) to set up the notifications appropriately. That way, it will poll your thread (Endpoint) only when something "interesting" happens, rather than unconditionally as it is now (wasting CPU).

In other words, whoever returns the "first" NotReady is the one responsible for setting up notifications. If you're just passing along the NotReady directly or indirectly from another function, then you don't need to do anything. If you have to insert current().notify() in random places, that often indicates a bug in the logic.

I looked at your ping_pong example a bit. What causes Endpoint to stall is the fact that you only poll the stream once, whether it was ready or not. This means if the stream is ready and gives you a value, you didn't bother asking for more. The stream doesn't know if you want more or not, so by not asking for more it simply assumes that you're not interested in the stream anymore. Therefore, your Endpoint will not reawaken if the stream receives a new item. If you're lucky, something else might wake up Endpoint and cause it to inspect the stream, but if you're unlucky then your stream will just sleep forever.

The change is to simply keep asking for more items from the stream until it hits NotReady, in which case you can be certain that the stream will reawaken the thread later on.

2 Likes

How many clients did you test the ping-pong example with?

1, 5, 10, 100, 1000. Even with one client it hangs.

Thanks for the link to the docs, I made you reply and @Fylwind's reply much clearer. I'll remove the call to notify(), since it's not the right solution to my problem.

I think you're also right about the Mutex being an issue, because I can reprodue the hang with this code:

extern crate futures;
extern crate tokio_core;

use std::sync::{Arc, Mutex};
use std::thread::{sleep, spawn};
use std::time::Duration;

use futures::{Async, Future};
use tokio_core::reactor::Core;

struct MyStruct(Arc<Mutex<u32>>);

impl Future for MyStruct {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("poll() called");
        match self.0.try_lock() {
            Ok(value) => {
                println!("lock acquired, value is {}", *value);
                Ok(Async::Ready(()))
            }
            Err(_) => {
                println!("lock not acquired. not ready to finish");
                Ok(Async::NotReady)
            }
        }
    }
}

fn main() {
    let shared = Arc::new(Mutex::new(0));
    let future = MyStruct(shared.clone());

    let thread = spawn(move || {
        println!("thread: start");
        let mut value = shared.lock().unwrap();

        println!("thread: sleeping with the log for one second");
        sleep(Duration::from_millis(1000));
        *value = 1;

        println!("thread: releasing lock");
    });

    sleep(Duration::from_millis(100));
    let mut core = Core::new().unwrap();
    core.run(future).unwrap();
    thread.join().unwrap();
}

Is that a well known limitation? A bug? And can I work around this in my code? I think it's quite bad, because that means that users of my crate can't have futures that acquires a lock on a mutex.

Therefore, your Endpoint will not reawaken if the stream receives a new item.

Wow that is tricky, but it totally makes sense now! Thanks a lot for looking at my code. I'm fixing this right now!

Edit: that fixes the issue! Thank you so much @Fylwind and @synthsym for all your explanations and help! Today I learned a lot, and I fixed the bug that caused me the most pain since I've started writing Rust code :smile:

1 Like

I created a library to solve this exact issue, futures_mutex. It is future aware, and will schedule futures that are waiting for a lock to be awoken to grab the lock. It's not a perfect solution because it can lead to starvation. And you need to remember to drop the lock when your done so other futures don't dead lock.

Personally, I think that working with futures requires a different pattern of concurrency. Shared memory and the concurrent primitives that ensure unique access and atomic operations are suited to kernel level threading and preemptive environments, but aren't great for cooperative, event-driven futures.

Do not communicate by sharing memory; instead, share memory by communicating.

Make a future that receives messages to update a value, or to send a value to another future. The actor model works wonders to help with these kinds of problems.

3 Likes