Integrate Rust async with foreign API: how close Sink/Stream?

I want solve simple problem, I have C API that allow to user users it's own
network stack in async way.

It is rather simple, you have to implement:

// start connection and imediatly return
fn c_api_start_connecting(token: *mut *mut c_void)
// start sending data and imediatly return
fn c_api_start_writing(token: *mut c_void, ...)
// start closing connection procedure and immediatly return
fn c_api_time_to_close(token: *mut c_void, ...)

and you have to call c_api_callback_on_new_data every time when you get data from network.

Looks simple, but I can not imagine how to implment this on Rust.
The problem how to stop the system.

For example if I use C API to hold Arc<Mutex<NetStream>>,
then I have to lock Mutex to start sending data, but at the same time I have to wait new incomming data with lock().unwrap().for_each.

If I use Arc<(Mutex<Sink>, Mutex<Stream>)> then I can write and read at the same time,
but I can not close, because of for reading I have to lock Stream.
I can time to time unlock Stream, but this extra energy consumption,
and looks like hack.

Any thoughts to how close NetStream (struct that implement Sink/SinkExt, Stream/StreamExt)
at any time and at the same make possible to write/read continiously?

Psedo code of my current thoughts:

unsafe fn c_api_start_connecting(token: *mut *mut c_void) {
    let state = Arc::new(Mutex::new(None));
    let state2 = state.clone();
    *token = Arc::into_raw(state);
    spawn(async move {
        let stream = connect().await;
        *state2.lock().unwrap() = Some(stream);
        
        spawn(async {
            // as Stream
            state2.lock().unwrap().
                for_each(|msg| c_api_callback_on_new_data(msg));
        });
    });
}
unsafe fn c_api_start_writing(token: *mut c_void, ...) {
    let state_tmp = Arc::from_raw(token);
    let state = state_tmp.clone();
    std::mem::forget(state_tmp);
    spawn(async move {
        //as Sink
        state.lock().unwrap().send(...).await;        
    });
}

unsafe fn c_api_time_to_close(token: *mut c_void, ...) {
    let state = Arc::from_raw(token);
    spawn(async move {
        state.lock().unwrap().close().await;
    });
}

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.