Async C API help

I'm writing a Rust API on top of an async C library.
On the C side, I have 2 functions (simplified):

bool is_ready(int job_id);
int start_job();

On the Rust side, I have a choice to do something like this:

task::spawn_blocking(|| {
   let id = ffi::start_job();
   loop {
      if ffi::is_ready(id){break}
   }
});

But I'd rather want to create a Future for convenience:

struct Job{}
impl Future for Job{
   type Output = ()
   fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
       if ffi::is_ready(self.job_id){
           Poll::Ready(())
       } else {
           cx.waker().wake_by_ref();
           Poll::Pending
       }
   }
}

I'm not sure if it's a good idea to implement it like that. I've read somewhere that waking a future from poll() is fine, but my concern is that it could defeat the purpose of async in a nutshell. We know that we shouldn't run blocking code within async, but is the above code blocking if it's constantly waking the task to poll the future? Can other tasks run concurrently with a future implemented like this?

You shouldn't do either. It's best if you can get C to send a notification.

Your Future is not incorrect per se, but it's going to consume a lot of CPU.

Ideally the C API will provide a callback to tell you when the operation is done, and this is where you'd wake the waker.

You aren't "blocking" per-se, it's more that the executor will have a queue of pending tasks and blocked tasks, and will sit in an infinite loop popping the same task off the pending queue and polling it.

Imagine the code polling your future looks something like this (massively simplified):

struct Executor {
  pending: Queue<Task>,
  blocked: Vec<Task>,
}

impl Executor {
  fn run(&mut self) {
    loop {
      let next_task = self.pending.pop();
      let ctx = self.new_context_which_enqueues_on_wake(&next_task);
      next_task.poll(&mut ctx);
      self.blocked.push(next_task);
    }
  }
}

If every poll to your Future triggers a wake, your task will be immediately put back in the pending queue.

So while the system won't lock up in an infinite loop (tasks can be woken and moved from the blocked to the pending queue by something in the background), when nothing else runs you'll be continually popping the next pending task, polling it, triggering a wake, and moving it back to the pending queue, ad infinitum.

The best kind of "raw" async libraries are those that offer a callback when ready, so maybe yours does feature that somewhere?

Ideally:

typedef void (*on_ready_cb)(void * ctx);

int start_job (
    void * ctx, // may be NULL
    on_ready_cb on_ready // may be NULL
);

In that case, you'd be able to write:

#[allow(nonstandard_style)]
mod ffi {
    pub
    use ::std::os::raw::{c_int, c_void};

    pub
    type on_ready_cb = Option<
        unsafe extern "C"
        fn (ctx: *mut c_void)
    >;

    extern "C" {
        pub
        fn start_job (ctx: *mut c_void, on_ready: on_ready_cb)
          -> c_int
        ;
    }
}

fn job () -> impl Future<()>
{
    use ::tokio::sync::oneshot;
    enum SignalDropOnly {}

    // A zero-shot channel 😄
    type Sender = oneshot::Sender<SignalDropOnly>;
    let (sender, receiver) = oneshot::channel();

    unsafe {
        let sender: Box<Sender> = Box::new(sender);
        let ctx = Box::into_raw(sender).cast();

        unsafe extern "C"
        fn on_ready (ctx: *mut ffi::c_void)
        {
            let sender: Box<Sender> = Box::from_raw(ctx.cast());
            drop(sender);
        }

        let _job_id = ffi::start_job(ctx, Some(on_ready));
    }

    async move {
        let _ = receiver.await;
    }
}
1 Like

Thank you all for your input,
I wish the C API provided a callback, but it doesn't. It can only be asked if it's completed a job.
An interesting fact about this C API is that it's not truly async per se, meaning it it can only run one job at a time i.e a second call to start_job() will block until first job is completed. But it will be consumed in a Rust async context, that's why I was thinking to make a future from it.

I wonder if it's worth the effort to spawn a special waker thread that would poll the API and wake up the future only to complete it. All this effort is to allow a nice run_job().await syntax instead of spawning a non-blocking task.

In that case I guess one of the least bad things you can do is:

use ::std::{
    future::Future,
    ops::Not as _, // prefix `!` looks bad
    time::Duration,
};

//                          Only if `is_ready()` can be called
pub //                      vvvv           from another thread (no TLS in C library)
fn job () -> impl 'static + Send + Future<Output = ()>
{
    let job_id = unsafe { ffi::start_job() };
    async move {
        while unsafe { ffi::is_ready(job_id) }.not() { // prefix `!` looks bad
            const LATENCY: Duration = Duration::from_millis(100);
            let () = ::tokio::time::sleep(LATENCY).await;
        }
    }
}

We will have the same re-enqueuing that @Michael-F-Bryan was talking about, but with a big enough LATENCY, it should give plenty of breathing room to the CPU.

1 Like

Well that's not going to work. If it blocks until all existing jobs are complete you'll lock up your entire application and it defeats the purpose of using async.

If it were me I'd drop this "async" C function and see if it provides a synchronous version that can be run from a worker thread (see tokio::task::spawn_blocking()).

1 Like

So far the only way to make it blocking that I found is something like this:

task::spawn_blocking(|| {
   let id = ffi::start_job();
   loop {
      if ffi::is_ready(id){break}
   }
});

I think it's a decent solution, I'll ask the developers of that API if they could add callbacks to their "async" functions :slight_smile:

You may also want to throw in a std::thread::yield_now() or a std::thread::sleep() after every check so the core running the background thread isn't pegged at 100% CPU usage.

So maybe something like this:

while !ffi::is_ready(id) { 
  std::thread::sleep(Duration::from_millis(5)); 
}
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.