The contract is that callback is invoked later, on a different thread. What is the most idiomatic way to convert that API to Futures, preferably using only stdlib?
if the binding is targeting specific async runtime, then an async channel with sync(blocking) API is probably the simplest solution, the callback just need to call tx.send(), it will wake the awaiting task automatically.
for a stdlib only solution, I would like to use a normal sync channel to pass the data, it's not very different from Arc<Mutex<Option<T>>> or Arc<OnceLock<T>>, I just like the channel API store an Option<T> in the callback context needs less indirection and heap allocation. you also need a waker registration to store the waker so the callback can wake the future.
then just pass the waker and the sender as context to the callback, poll (try_recv()) the channel receiver in the future:
EDIT: single heap allocation is enough
/// this is wrapped in a `Mutex`
struct CallbackContext {
// "latest" registered waker
registration: Waker,
// send data from callback to future
data: Option<i32>,
}
pub async fn foo(&mut self) -> i32 {
let mut callback_context: Option<Arc<Mutex<CallbackContext>>> = None;
poll_fn(move |cx| {
if let Some(callback_context) = callback_context.as_ref() {
// not first time being polled, but may be spurious wakeup
let mut callback_context = callback_context.lock().unwrap();
// try recv result
if let Ok(result) = callback_context.data.take() {
return Poll::Ready(result);
}
if !callback_context.registration.will_wake(cx.waker()) {
callback_context.registration.clone_from(cx.waker());
}
} else {
// first time polled, prepare callback context, call ffi with callback
let callback_context_new = Arc::new(Mutex::new(CallbackContext {
registration: cx.waker().clone(),
data: None,
}));
callback_context = Some(Arc::clone(&callback_context_new));
unsafe extern "C" fn callback(context: *mut c_void, result: i32) {
let callback_context = Arc::from_raw(context.cast::<Mutex<CallbackContext>>());
let mut callback_context = callback_context.lock().unwrap()
callback_context.data.replace(result);
callback_context.registration.wake_by_ref();
}
unsafe {
foo(self.0, Arc::into_raw(callback_context_new), callback);
}
}
Poll::Pending
})
.await
}
There is no need to involve Tokio if you are not already using Tokio — use futures_channel::oneshot. (Either one will work regardless of choice of executor, but futures_channel is a much smaller library.)
And as @nerditation demonstrates, you can also write your own oneshot, but note that the correct implementation is actually quite subtle — for example, it might look like you can move the rx.try_recv() outside of the lock() scope, but if you do that, you will get a lost-wakeup bug which nondeterministically occurs in the case where the result is delivered to the channel by the other thread right after rx.try_recv() fails but before the new waker is registered.
In my opinion, using a oneshot channel type is the correct and readable way to convert a callback API to a future API.
How would you do the C interop here? Redundantly box the Sender and turn the Box into a pointer? Unfortunately I don't see from_raw / into_raw functions on Sender, which could forward to the corresponding functions on Arc.
Oh, I'm usually not thinking about specifically C-style callbacks. Needing that ability to fit it into a single pointer is a good reason to use a custom implementation of a oneshot. (Or perhaps there’s some library that offers such a thing.)