Futures wrapping an async C library


#1

Hey All. I’m working on the Paho MQTT messaging library. The initial Rust version does not have Futures or Tokio support, but adding that support seems like the next logical step. The library wraps the Paho C lib, which is a fairly classic asynchronous C library that uses callbacks to signal when an operation has completed or failed.

I’m just getting up to speed on Futures. Can anyone point me to some tutorials or existing projects that show how to do this type of thing (wrap async callbacks) using the best current practices. I have found a few projects but am not too sure about which are good or bad… or completely out of date.

Thanks!


#2

The up to date docs should be here: https://tokio.rs/docs/futures/basic/ (although Rust is in the process of switching from “futures 0.1” to a new version in the standard library, so things will be in flux).

In general you need to implement your struct that implements Future trait.

  1. The poll must respond whether the callback has happened already
  2. The poll must save the most recent task::current() value in order to call task.notify() when the callback comes.

pub struct CallbackFuture<T, E> {
    task: Arc<Mutex<Option<Task>>>,
    data: Arc<OnceCell<Result<T, E>>>,
}

impl<T: Clone, E: Clone> Future for CallbackFuture<T, E> {
    type Item = T;
    type Error = E;

    fn poll(&mut self) -> Result<Async<T>, E> {
        match self.data.get() {
            None => {
                let mut t = self.task.lock().unwrap();
                *t = Some(task::current());
                Ok(Async::NotReady)
            },
            Some(Ok(ok)) => Ok(Async::Ready(ok.clone())),
            Some(Err(err)) => Err(err.clone()),
        }
    }
}

The task and data are Arc/once_cell, so that you can keep hold of them in some other place that receives the callback, and fill them in and notify when the callback comes.


#3

Oh. That’s awesome. Thank you.

Do you know of any libraries that implement this type of behavior?


#4

To my surprise — no. That’s why I wrote that bit above.


#5

OK, cool. I got this running. Instead of the once_cell, I wrapped the result and a “done” flag in the same mutex as the task. This mapped almost exactly to the existing condition variable implementation that I’m replacing.

But I also just stumbled on the futures oneshot. How would this compare to giving the tx side of a oneshot to the callback handler? Might there be performance issues or better flexibility one way or the other?

I’ll start looking into the implementation of the oneshot for hints.


#6

In case anyone stumbles on this…

I have an implementation of this up on GitHub in the Eclipse MQTT library (paho-mqtt), which wraps the asynchronous Paho C lib. The “Token” objects in the Rust library track asynchronous operations and implement the Futures trait similar to what @kornel recommended.
token.rs

The client also use a futures::sync::mpsc channel for receiving messages from the remote server and passing them on to the application. Thus the application will see them trough a Futures Stream.
async_client.rs

This combination seems to be working pretty well.