Help Integrating C library with Callback

I'm a Rust newb and need some help!

The current project I am working on is to replace an existing application written in Python with Rust. I am struggling with one of the components that integrates with an existing C library. The interface is quite simple and I have setup bindgen for automatically generating the rust -> C types.

The program seems to work; however, it crashes with a memory error upon shutdown. Specifically, I get this: tcache_thread_shutdown(): unaligned tcache chunk detected.

My main two objectives for this question are:

  1. Determine how to fix the crash.
  2. How can I add a better abstraction layer on top of the rust bindgen generated functions. Specifically, I would like to have a rust wrapper function that accepts a closure for start_sensor wrapper function.

All of the code can be found here: GitHub - SeeRich/mre-rust-cpp

Should be pretty simple to build/reproduce (Makefile included with instructions in the README). Please let me know if you have trouble running/reproducing this. I reduced the code as far as I could so others could easily run themselves (if wanted).

The main rust code (rs/src/sensor.rs):

  • The unsafe start/stop functions come directly from the C header file as well as DataPayload_t.
  • The example is a litte verbose because I was trying to keep it as close to the original as possible while still exhibiting the issue.
use crate::DataPayload_t;
use crate::{start, stop};

use std::ffi::c_void;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

// Callback from C library
extern "C" fn sensor_data_callback(data: *mut DataPayload_t, user_data: *mut c_void) {
    unsafe {
        if data.is_null() || user_data.is_null() {
            eprintln!("Invalid arguments in callback!");
            return;
        }

        println!("cb - {:?}", *data);

        let logger = Arc::from_raw(user_data as *mut DataLogger);
        logger.on_sensor_msg(&(*data));
    }
}

// Wrapper around starting sensor
fn start_sensor(user_data: *mut c_void) {
    unsafe {
        start(Some(sensor_data_callback), user_data);
    }
}

// Wrapper around stopping sensor
fn stop_sensor() {
    unsafe {
        stop();
    }
}

pub struct SensorInstance {
    inner: Arc<Mutex<SensorInstanceInner>>,
}

impl SensorInstance {
    pub fn new() -> Self {
        SensorInstance {
            inner: Arc::new(Mutex::new(SensorInstanceInner::new())),
        }
    }

    pub fn start(&self) {
        let mut inner = self.inner.lock().unwrap();
        inner.start();
    }

    pub fn stop(&self) {
        let mut inner = self.inner.lock().unwrap();
        inner.stop();
    }
}

impl Default for SensorInstance {
    fn default() -> Self {
        Self::new()
    }
}

struct DataLogger {}

impl DataLogger {
    pub fn new() -> Self {
        DataLogger {}
    }

    pub fn on_sensor_msg(&self, result: &DataPayload_t) {
        // Really does something more complex...
        println!("Logging data: {:?}", result);
    }
}

struct SensorInstanceInner {
    running: bool,
    data_logger: Option<Arc<DataLogger>>,
    sensor_thread: Option<JoinHandle<()>>,
}

impl SensorInstanceInner {
    fn new() -> Self {
        SensorInstanceInner {
            running: false,
            data_logger: None,
            sensor_thread: None,
        }
    }

    fn start_sensor_thread(&mut self) {
        println!("Starting sensor processing");

        // Spawn a thread that will call start sensor, this will block the thread until complete
        let logger = self.data_logger.clone().unwrap().clone();
        let handle = thread::spawn(|| {
            let user_data_ptr = Arc::into_raw(logger);
            start_sensor(user_data_ptr as *mut c_void);
        });
        self.sensor_thread = Some(handle);
    }

    fn start(&mut self) {
        if self.running {
            return;
        }

        // Create data logger
        let logger = Arc::new(DataLogger::new());
        self.data_logger = Some(logger.clone());

        self.start_sensor_thread();

        self.running = true;
    }

    fn stop(&mut self) {
        if !self.running {
            println!("ERROR - sensor is not running");
            return;
        }

        // Stop sensor processing
        stop_sensor();

        // Wait for thread
        if self.sensor_thread.is_some() {
            let _ = self.sensor_thread.take().unwrap().join();
        }

        self.running = false;
    }
}

The C header file (cpp/sensor.h):

#ifndef LIB_SENSOR_H
#define LIB_SENSOR_H

#include <stdbool.h>
#include <stdint.h>

/// Sensor data payload
typedef struct DataPayload_t
{
    uint64_t data_1;
    uint32_t data_2;
    bool data_3;
} DataPayload_t;

/// Callback for sensor data (void* is user data)
typedef void sensor_data_cb_t(DataPayload_t*, void*);

/// Start function to begin processing
void start(sensor_data_cb_t* sensor_data_cb, void* user_data);

/// Stop function to end processing
void stop();

#endif

Rust test code (rs/src/bin/test.rs):

use sensorrs::sensor::SensorInstance;

fn main() {
    let sensor = SensorInstance::new();
    sensor.start();
    std::thread::sleep(std::time::Duration::from_millis(300));
    sensor.stop();
}
1 Like

Don't know if it's the cause here, but if the callback is fired multiple times you would free the Arc multiple times.

Instead, simply use &*user_data.cast::<DataLogger>() or similar, and don't use from_raw() until you want to take ownership back from the C side, eg when you're unhooking the callback.

You could maybe play games with forget(), if you wanted to specifically pass an Arc to keep it alive on the Rust side, but I think that's dubious, probably at least library undefined behavior.

3 Likes

Thank you!

That's what I get for not reading/understanding the documentation.

The cast you suggested fixes the crash. Which makes sense from reading the docs on Arc::into_raw and Arc::from_raw.

Any idea how I can pass a closure to start_sensor instead of *mut c_void?

1 Like

Sure, here's the pattern I generally use:

use std::ffi::c_void;

fn start<F: Fn()>(f: F) {
    extern "C" {
        fn start(
          callback: extern "C" fn (data: *const c_void),
          data: *const c_void,
        );
    }
    
    unsafe {
        start(
            wrap_callback::<F>,
            Box::into_raw(Box::new(f)).cast(),
        );
    }
    
    extern "C" fn wrap_callback<F: Fn()>(data: *const c_void) {
        (unsafe { &*data.cast::<F>() })()
    }
}

fn main() {
    let count = std::cell::RefCell::new(0);
    start(move || {
        let mut count = count.borrow_mut();
        *count += 1;
        println!("hello {}", *count);
    });
}

Ideally, you should be conservative and use Fn() but you can use FnMut() if you can prove it's not re-entrant. Or you could add + Sync and let it be called from multiple threads!

3 Likes

in rust, each closure has a distinct type that is unnamed but only usable through the Fn faimily traits. usually you use generic types when you need to use closures.

you can create a generic "shim" callback wrapper function which remembers and recovers the original type and then forward the call. example:

the "shim" callback function which you pass to the C side
this example I use FnMut(), assuming the callback will not be called concurrently.
use Fn() if that's not guaranteed.

extern "C" fn callback_shim<F: FnMut(&DataPayload_t)>(data: *mut DataPayload_t, user_data: *mut c_void) {
    // SAFETY: `start_sensor()` guarantees the erased pointer has the correct type
    let f: &mut F = unsafe { &mut *(userdata as *mut F)  };
    // SAFETY: signature of `F` guarantees the borrow has valid lifetime
    f(unsafe { &*data })
}

the start_sensor accept a closure as callback and erase the type, which is later recovered by the callback shim.

fn start_sensor<F: FnMut(&DataPayload_t)>(callback: F) {
    // depending on the C API, you can decide how to handle the cleanup
    // in this example I simply box and leak it, for simplicity sake
    let callback = Box::into_raw(Box::new(callback));
    // SAFETY: assume the ffi API is well behaved
    unsafe {
        start(Some(callback_sim::<F>), callback as _);
    }
}
2 Likes

If it helps, I wrote an article about exactly this a couple years ago.

3 Likes

I really like this solution! How does this get around the issue of Rust closures being "Fat pointers"? I was under the impression I would need something like Box<dyn Fn(...)->...>? Is it simply because the code you shared doesn't use dyn...?

Also, if I wanted to use Fn(&DataPayload_t) instead of FnMut(&DataPayload_t)` as this is will need to be thread safe, how would that change things?

The following code is working now without any crashes, leaks, or address sanitizer errors:

extern "C" fn callback_shim<F: Fn(&DataPayload_t) + Send + Sync>(
    data: *mut DataPayload_t,
    user_data: *mut c_void,
) {
    // SAFETY: `start_sensor()` guarantees the erased pointer has the correct type
    let f: &F = unsafe { &*(user_data as *mut F) };
    // let f: &mut F = unsafe { &mut *(user_data as *mut F) };
    // SAFETY: signature of `F` guarantees the borrow has valid lifetime
    f(unsafe { &*data })
}

fn start_sensor<F: Fn(&DataPayload_t) + Send + Sync>(callback: F) {
    let callback = Box::into_raw(Box::new(callback));
    // SAFETY: assume the ffi API is well behaved
    unsafe {
        start(Some(callback_shim::<F>), callback as _);
    }
    // Drop the callback to clean up memory
    let _ = unsafe { Box::from_raw(callback) };
}
1 Like

references to closures are NOT fat: a closure is just an anonymous struct, but it is statically sized, only dynamically sized types need "fat" pointers.

quite the opposite: dyn Fn() is a dynamically sized type, so the pointer (be it a Box, a reference, or a raw pointer) must be fat. if you use dyn Fn(), you'll need "double" box it, i.e. a (thin) box pointing to a (fat) box pointing to a dyn Fn().

using Fn() instead of FnMut() makes the caller easier to implement (less chance to make error) because you can only need a shared/immutable reference to call the callback which means you can call it concurrently.

the trade-off is the callback is more restricted regarding what it can do, since it cannot mutate any states (without explicit interior mutability).

1 Like

forget to mention, Fn() only gives you safe concurrency for single thread. if you need thread safety, you'll also need the Sync bound, as you already figured out.

the Send bound, however, may or may not be necessary, depending on how the ffi library is implemented. but anyway it's better to be conservative and use it (unless it over-restricts the callback implementations, which is very unlikely).

1 Like

This is exactly what I needed, thank you for the perfect explanation!

I was making things over complicated due to my misunderstanding of the pointer size for closures.

I posted all the final working code here: GitHub - SeeRich/mre-rust-cpp

It was tested with valgrind and llvm address + leak sanitizers as well.

1 Like