I'm trying to use Tonic to build a gRPC server which is able to do bidirectional streaming rpc calls.
I tried to simplified the question minimal and can be run by:
cargo run --bin server
cargo run --bin client
run multiple times half of the time client does not print anything, and server is blocking on get .next()
after send a few messages
in here
The rpc call is as the following:
rpc Echo(stream HelloRequest) returns (stream HelloReply);
where echo payload has only one filed of type string.
The implementation is intend to be used with FFI, as main rpc functional is provide by calling a FFI function. So I tried to expose some interface for streaming part.
In the service trait:
type EchoStream = Pin<Box<dyn Stream<Item = Result<HelloReply, Status>> + Send + 'static>>;
async fn echo(
&self,
request: Request<Streaming<HelloRequest>>,
) -> Result<Response<Self::EchoStream>, Status> {
{
let f = self.echo;
let request = request.into_inner();
let (tx, rx) = mpsc::unbounded_channel::<Result<HelloReply, Status>>();
let response = UnboundedReceiverStream::new(rx);
spawn(async move {
println!("begin spawn_blocking");
let tx: Box<UnboundedSender<Result<HelloReply, Status>>> = box tx;
let tx = Box::into_raw(tx) as *mut libc::c_void;
let rx: Box<Streaming<HelloRequest>> = box request;
let rx = Box::into_raw(rx) as *mut libc::c_void;
f(tx, rx);
println!("end spawn_blocking");
});
let response = Box::pin(response);
let response = response as Self::EchoStream;
Ok(Response::new(response))
}
}
here self.f
and f
stands for the FFI function. The arguments are
- the raw pointer to the sender of response stream
- the raw pointer to the
Streaming<HelloRequest>
to use the raw pointer to streaming request, here is a function:
#[repr(C)]
pub struct ByteString {
ptr: *const u8,
len: usize,
}
#[no_mangle]
pub unsafe extern "C" fn next_hello_request_name(stream: *mut libc::c_void) -> *const ByteString {
let stream = stream as *mut Streaming<HelloRequest>;
let stream = stream.as_mut().unwrap();
let request = {
println!("block on next");
let ret = block_on(stream.next());
println!("next done");
ret
};
let (ptr, len) = match request {
Some(request) => {
let request: HelloRequest = request.unwrap();
let name: String = request.name;
let (ptr, len, _) = name.into_raw_parts();
(ptr as *const u8, len)
}
None => (ptr::null(), 0),
};
let bs = box ByteString { ptr, len };
Box::into_raw(bs)
}
and for send part:
#[no_mangle]
pub unsafe extern "C" fn send_hello_reply(sender: *mut libc::c_void, x: *mut ByteString) {
println!("enter send");
let sender = sender as *mut UnboundedSender<Result<HelloReply, Status>>;
let sender = sender.as_ref().unwrap();
println!("do send");
let x: Box<ByteString> = Box::from_raw(x);
let (ptr, len) = (x.ptr, x.len);
let x = String::from_raw_parts(ptr as *mut u8, len, len);
let message = x.clone();
forget(x);
let x = HelloReply { message };
let x = Ok(x);
println!("block on send");
tokio::sync::mpsc::UnboundedSender::send(sender, x).unwrap();
println!("send done");
}
all extern "C"
FFI functions are sync. plus the start server function:
#[no_mangle]
pub extern "C" fn main() {
let addr = HOST_PORT.parse().unwrap();
let greeter = GreeterImpl { echo };
let future = async {
Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await
.unwrap();
};
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(future)
}
here is a runable example https://github.com/alissa-tung/issue
cargo run --bin server
cargo run --bin client
run multiple times half of the time client does not print anything, and server is blocking on get .next()
after send a few messages
I am wandering whether it is bad to pass them to FFI or what is the idiomatic usage. And am I use async stuff wrong since the async function is been called from a sync context, where the sync function is been called in an async context.