An impl of bidi-streaming gRPC server for FFI can't work half the time

I'm trying to use Tonic to build a gRPC server which is able to do bidirectional streaming rpc calls.

I tried to simplified the question minimal and can be run by:

cargo run --bin server
cargo run --bin client

run multiple times half of the time client does not print anything, and server is blocking on get .next() after send a few messages

in here

The rpc call is as the following:

rpc Echo(stream HelloRequest) returns (stream HelloReply);

where echo payload has only one filed of type string.

The implementation is intend to be used with FFI, as main rpc functional is provide by calling a FFI function. So I tried to expose some interface for streaming part.

In the service trait:

    type EchoStream = Pin<Box<dyn Stream<Item = Result<HelloReply, Status>> + Send + 'static>>;

    async fn echo(
        &self,
        request: Request<Streaming<HelloRequest>>,
    ) -> Result<Response<Self::EchoStream>, Status> {
        {
            let f = self.echo;
            let request = request.into_inner();
            let (tx, rx) = mpsc::unbounded_channel::<Result<HelloReply, Status>>();
            let response = UnboundedReceiverStream::new(rx);

            spawn(async move {
                println!("begin spawn_blocking");
                let tx: Box<UnboundedSender<Result<HelloReply, Status>>> = box tx;
                let tx = Box::into_raw(tx) as *mut libc::c_void;
                let rx: Box<Streaming<HelloRequest>> = box request;
                let rx = Box::into_raw(rx) as *mut libc::c_void;
                f(tx, rx);
                println!("end spawn_blocking");
            });

            let response = Box::pin(response);
            let response = response as Self::EchoStream;
            Ok(Response::new(response))
        }
    }

here self.f and f stands for the FFI function. The arguments are

  • the raw pointer to the sender of response stream
  • the raw pointer to the Streaming<HelloRequest>

to use the raw pointer to streaming request, here is a function:

#[repr(C)]
pub struct ByteString {
    ptr: *const u8,
    len: usize,
}

#[no_mangle]
pub unsafe extern "C" fn next_hello_request_name(stream: *mut libc::c_void) -> *const ByteString {
    let stream = stream as *mut Streaming<HelloRequest>;
    let stream = stream.as_mut().unwrap();
    let request = {
        println!("block on next");
        let ret = block_on(stream.next());
        println!("next done");
        ret
    };

    let (ptr, len) = match request {
        Some(request) => {
            let request: HelloRequest = request.unwrap();
            let name: String = request.name;
            let (ptr, len, _) = name.into_raw_parts();
            (ptr as *const u8, len)
        }
        None => (ptr::null(), 0),
    };
    let bs = box ByteString { ptr, len };
    Box::into_raw(bs)
}

and for send part:

#[no_mangle]
pub unsafe extern "C" fn send_hello_reply(sender: *mut libc::c_void, x: *mut ByteString) {
    println!("enter send");
    let sender = sender as *mut UnboundedSender<Result<HelloReply, Status>>;
    let sender = sender.as_ref().unwrap();
    println!("do send");

    let x: Box<ByteString> = Box::from_raw(x);
    let (ptr, len) = (x.ptr, x.len);
    let x = String::from_raw_parts(ptr as *mut u8, len, len);
    let message = x.clone();
    forget(x);
    let x = HelloReply { message };
    let x = Ok(x);

    println!("block on send");
    tokio::sync::mpsc::UnboundedSender::send(sender, x).unwrap();
    println!("send done");
}

all extern "C" FFI functions are sync. plus the start server function:

#[no_mangle]
pub extern "C" fn main() {
    let addr = HOST_PORT.parse().unwrap();

    let greeter = GreeterImpl { echo };

    let future = async {
        Server::builder()
            .add_service(GreeterServer::new(greeter))
            .serve(addr)
            .await
            .unwrap();
    };

    let runtime = runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();

    runtime.block_on(future)
}

here is a runable example https://github.com/alissa-tung/issue

cargo run --bin server
cargo run --bin client

run multiple times half of the time client does not print anything, and server is blocking on get .next() after send a few messages

I am wandering whether it is bad to pass them to FFI or what is the idiomatic usage. And am I use async stuff wrong since the async function is been called from a sync context, where the sync function is been called in an async context.

Are you sure you want to use spawn rather than spawn_blocking in echo?

I'm not sure. I had tried use spawn_blocking here without .await its future but there is no response printing by client and no logs by server. I thought there was something I was missing. :disappointed: If there is something that can be refered to about the behavior difference? Thx!

It turns out I had write

spawn_blocking(move || async move {

which is incorrect since there needs to pass a sync task closure. use spawn_blocking(move || { it works fine.

I haven't read the rest of the thread, but writing spawn_blocking(move || async move { is not what you want. It will not run the code inside the async block.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.