Bridging callbacks and futures

Let's say I have a low-level C API that exposes a callback-based operation:

struct Widget;

void foo(*Widget widget, void* context, void (*callback)(void*, int32_t))

The contract is that callback is invoked later, on a different thread. What is the most idiomatic way to convert that API to Futures, preferably using only stdlib?

struct Widget {
  c: *mut c::Widget
}

impl Widget {
    pub async fn foo(&mut self) i32 {
        ...
    }
}

Simple not-std solution is tokio's oneshot channel.

If you want your custom implementation, then at minimum, you need to store the result somewhere and call the waker.

if the binding is targeting specific async runtime, then an async channel with sync(blocking) API is probably the simplest solution, the callback just need to call tx.send(), it will wake the awaiting task automatically.

for a stdlib only solution, I would like to use a normal sync channel to pass the data, it's not very different from Arc<Mutex<Option<T>>> or Arc<OnceLock<T>>, I just like the channel API store an Option<T> in the callback context needs less indirection and heap allocation. you also need a waker registration to store the waker so the callback can wake the future.

then just pass the waker and the sender as context to the callback, poll (try_recv()) the channel receiver in the future:

EDIT: single heap allocation is enough

/// this is wrapped in a `Mutex`
struct CallbackContext {
	// "latest" registered waker
	registration: Waker,
	// send data from callback to future
	data: Option<i32>,
}

pub async fn foo(&mut self) -> i32 {
	let mut callback_context: Option<Arc<Mutex<CallbackContext>>> = None;
	poll_fn(move |cx| {
		if let Some(callback_context) = callback_context.as_ref() {
			// not first time being polled, but may be spurious wakeup
			let mut callback_context = callback_context.lock().unwrap();
			// try recv result
			if let Ok(result) = callback_context.data.take() {
				return Poll::Ready(result);
			}
			if !callback_context.registration.will_wake(cx.waker()) {
				callback_context.registration.clone_from(cx.waker());
			}
		} else {
			// first time polled, prepare callback context, call ffi with callback
			let callback_context_new = Arc::new(Mutex::new(CallbackContext {
				registration: cx.waker().clone(),
				data: None,
			}));
			callback_context = Some(Arc::clone(&callback_context_new));
			unsafe extern "C" fn callback(context: *mut c_void, result: i32) {
				let callback_context = Arc::from_raw(context.cast::<Mutex<CallbackContext>>());
				let mut callback_context = callback_context.lock().unwrap()
				callback_context.data.replace(result);
				callback_context.registration.wake_by_ref();
			}
			unsafe {
				foo(self.0, Arc::into_raw(callback_context_new), callback);
			}
		}
		Poll::Pending
	})
	.await
}

What happens if the C widget is freed before the callback was called?

There is no need to involve Tokio if you are not already using Tokio — use futures_channel::oneshot. (Either one will work regardless of choice of executor, but futures_channel is a much smaller library.)

And as @nerditation demonstrates, you can also write your own oneshot, but note that the correct implementation is actually quite subtle — for example, it might look like you can move the rx.try_recv() outside of the lock() scope, but if you do that, you will get a lost-wakeup bug which nondeterministically occurs in the case where the result is delivered to the channel by the other thread right after rx.try_recv() fails but before the new waker is registered.

In my opinion, using a oneshot channel type is the correct and readable way to convert a callback API to a future API.

1 Like

use futures_channel::oneshot

How would you do the C interop here? Redundantly box the Sender and turn the Box into a pointer? Unfortunately I don't see from_raw / into_raw functions on Sender, which could forward to the corresponding functions on Arc.

Oh, I'm usually not thinking about specifically C-style callbacks. Needing that ability to fit it into a single pointer is a good reason to use a custom implementation of a oneshot. (Or perhaps there’s some library that offers such a thing.)

Created an issue suggesting the addition of into_raw / from_raw to futures_channel::oneshot::Sender.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.