Async streams and CPU usage

I created a wrapper type for asynchronously reading key events using crossterm as such:

use tokio::stream::Stream;
use futures_util::task::Context;
use tokio::macros::support::{Pin, Poll};
use crossterm::event;
use crossterm::event::{Event, KeyEvent, KeyCode};

const REFRESH_TIME: std::time::Duration = std::time::Duration::from_secs(0);

pub struct AsyncKeys(pub String);

pub enum KeyReturnObject {
    Line(String),
    CommandTab,
    None
}

impl Stream for AsyncKeys {
    type Item = KeyReturnObject;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if let Ok(true) = event::poll(REFRESH_TIME) {
            match event::read() {
                Ok(Event::Key(evt)) => {
                    match evt.code {
                        KeyCode::Backspace =>{
                            let rem_idx = self.0.len() - 1;
                            self.0.remove(rem_idx);
                            return Poll::Ready(Some(KeyReturnObject::None))
                        },

                        KeyCode::Enter => {
                            return Poll::Ready(Some(KeyReturnObject::Line(self.0.split_off(0))))
                        },

                        KeyCode::Char(val) => {
                            self.0.push(val);
                            return Poll::Ready(Some(KeyReturnObject::None))
                        },

                        KeyCode::Tab => {
                            return Poll::Ready(Some(KeyReturnObject::CommandTab))
                        }

                        _ => {}
                    }
                }

                _ => {}
            }
        }

        Poll::Ready(Some(KeyReturnObject::None))
    }
}

As you can see, I return Poll::Read(Some(KeyReturnObject::None))) when there's nothing available. I do this over Poll::NotReady because I don't want the stream to stop being polled. In the receiving end, I run this:

let mut async_keys = AsyncKeys(String::new());

    while let Some(ret) = async_keys.next().await {
        match ret {
            KeyReturnObject::Line(input) => {
                let trimmed = input.trim();
                let parts = trimmed.split(" ").collect::<Vec<&str>>();

                //let base_cmd = parts[0];

                if let Err(err) = handlers::handle(clap_app.0.lock().expect("unable to get read lock for clap app"), parts, &server_remote, &ctx) {
                    colour::red_ln!("{}", err.into_string());
                }

                print_prompt(false, &ctx)
            },

            KeyReturnObject::CommandTab => {
                log::error!("tab[0] pressed");
            },

            _ => {
                async_std::task::sleep(Duration::from_millis(50)).await;
            }
        }
    }

As you can see, if KeyReturnObject::None is returned from the poll, I sleep 50ms. When I use this method, the average CPU usage of the program jumps from 0.15% to 0.4% which is unnacceptable. I know that a waker can be used to get the stream to be polled again, but I'm not sure how to set this up. How might I get this to work without chewing up unnecessary CPU? Thanks

Why is this less efficient than in comparison to:

while let Some(line) = async_std::io::BufReader(async_std::io::stdin()).lines().next().await { ... }

How does the above "know" when a keyevent happens and needs to be awoken with the waker? Can that method be used for my above "key by key" approach?

It knows because your runtime will ask the OS to send a notification when there is data available on stdin.

I still recommend spawning a new thread with std::thread::spawn and sending any keypresses to async code using a channel.

Okay, if you say so. I trust :slight_smile:

Does the runtime "ask the OS" via mio?

Well async-std does not use mio anymore, but instead calls the OS-specific apis directly somewhere in the smol crate. If this was networking IO, it would indeed do so through epoll, kqueue or IOCP depending on your OS, and if you used Tokio, that would indeed happen through mio.

In the specific case of stdin, it does it by running a blocking call on the stdin from the standard library on another thread.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.