Correct Way to Interface Async Code

I have a third party SDK that I must integrate with. Bundled within this SDK is a threadpool that asynchronously completes tasks. It exposes an interface like this:

/// Opaque context type
typedef struct ctx ctx_t;

/// Opaque object type
typedef struct my_obj my_obj_t;

my_obj_t * allocate_obj();

/// Callback type
typedef void (*done_callback)(void *udata, my_obj_t *obj, int error_code);

/// Schedule obj for modification. This will be pushed into a threadpool and eventually executed.
/// When execution is complete, done_callback will be called
void schedule_mutation(ctx_t * ctx, my_obj_t *obj, done_callback *callback, void * udata);

I am trying to implement schedule_mutation as an async function, inside of Rust. Below is a rough skeleton of an implementation:

  • Assume module ext contains bindgen-like bindings to the external api
  • Ignore the following mistakes, omitted due to laziness brevity:
    • Potentially spurious waker clones
    • Never freeing shared future struct

I only wish to ask if this is the right direction.

use std::{
    cell::RefCell,
    ffi::c_void,
    future::Future,
    marker::PhantomPinned,
    pin::Pin,
    ptr::NonNull,
    sync::Mutex,
    task::{self, Poll, Waker},
};

pub struct MyLibrary(NonNull<ext::ctx_t>);

impl MyLibrary {
    pub unsafe fn schedule_mutation(&mut self) -> ObjMutationFuture {
        let obj = ext::allocate_obj();

        let future = ObjMutationFuture::new();
        ext::schedule_mutation(
            self.0.as_ptr(),
            obj,
            Some(obj_callback),
            future.inner_ptr() as *mut c_void,
        );
        future
    }
}

pub extern "C" fn obj_callback(inner: *mut c_void, obj: *mut ext::my_obj_t, error_code: i32) {
    unsafe {
        let inner: &Mutex<ObjMutationFutureInner> =
            &*(inner as *const Mutex<ObjMutationFutureInner>);
        let mut inner = inner.lock().unwrap();
        inner.obj = obj;
        inner.error_code = error_code;
        inner.done = true;
        if let Some(waker) = inner.waker.take() {
            waker.wake();
        }
    }
}

struct ObjMutationFutureInner {
    /// The waker to fulfill
    waker: Option<Waker>,
    /// The actual result
    obj: *mut ext::my_obj_t,
    error_code: i32,
    done: bool,
    /// Make extra-sure it's pinned
    _pin: PhantomPinned,
}

pub struct ObjMutationFuture(*const Mutex<ObjMutationFutureInner>);

impl ObjMutationFuture {
    fn new() -> ObjMutationFuture {
        ObjMutationFuture(Box::into_raw(Box::new(Mutex::new(
            ObjMutationFutureInner {
                waker: None,
                obj: std::ptr::null_mut(),
                error_code: 0,
                done: false,
                _pin: PhantomPinned,
            },
        ))))
    }

    unsafe fn inner_ptr(&self) -> *const Mutex<ObjMutationFutureInner> {
        self.0
    }
}

impl Future for ObjMutationFuture {
    type Output = Result<*mut ext::my_obj_t, i32>;

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        let mutex = unsafe { &*self.0 };
        let mut inner = mutex.lock().unwrap();
        if inner.done {
            // TODO: destroy inner future object
            if inner.error_code == 0 {
                Poll::Ready(Ok(inner.obj))
            } else {
                Poll::Ready(Err(inner.error_code))
            }
        } else {
            // TODO: add would_wake checks to avoid spurious clones
            inner.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

In short, I Box::into_raw(Box::new(...)), my way into a state object pointer, give one pointer to the SDK, and another pointer to the future. Future polls lock the underlying object, check if it's done, and update the waiter. The SDK callback locks the object, stores the result, and calls wake.

I have a few questions:

  • Is this the right approach? Have I missed some general important concept?
  • Is there any way to avoid the mutex? I assume no, since poll can be spuriously called at any point, and I must enforce unique access to the Inner struct.
  • Am I missing Pin somewhere? Is my usage of PhantomPinned sufficient to force ObjMutationFutureInner to be pinned? Do I need to include a Box::pin to my future creation step?
  • Would it be more idiomatic to delay the sdk call ext::schedule_mutation to the first poll call?
  • Is there any way to avoid boxing the entire Inner structure, and store the state directly in the future object?
  • Is calling wake while the lock is held a bad idea?

Thinking more on this, I think I am doing this wrong.

I should move the ext::schedule_mutation call into the future, and store the response inside the future without a box. This avoids the allocation, and let's me take advantage of the fact my future is Pined only after it has been returned.

Because your future doesn't do anything when polled, just reports the result of an operation, consider replacing it entirely with a oneshot channel. The callback owns the sending side of the channel, and ObjMutationFuture is replaced with (or is a wrapper for) the receiving side. What you've implemented is basically a oneshot channel entangled with FFI code.

By using an existing channel implementation, you should be able to have fewer cases to handle and fewer bugs.

You might be able to use only atomics and not a mutex, and get a wait-free implementation. That's much trickier to get right, though, since you have to handle all possible interleavings of individual atomic operations, rather than doing multiple things while you've locked the mutex.

In your code, your shared state is independently heap-allocated so you don't need pinning; you own the ObjMutationFutureInner so you control when it moves, and Pin won't help you not move it. PhantomPinned and Pin are for telling some other owner you don't control that they shouldn't move the value.

This is only sound if there is a way to cancel the mutation callback, and you do so in impl Drop for ObjMutationFuture. Otherwise, you have a use-after-free bug where if the future is dropped, the SDK callback will still try to access the future's memory.

If there is a way to cancel the callback, go ahead and do this. Otherwise, you need some kind of shared allocation, and a oneshot channel is a nice prepackaged way to get that.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.