I have a Tonic Server which continuously streams hello
to its client if connected.
pub mod hello_world {
tonic::include_proto!("helloworld"); // The string specified here must match the proto package name
}
#[derive(Debug, Default)]
pub struct MyGreeter {}
#[tonic::async_trait]
impl Greeter for MyGreeter {
type SayHelloStream = ReceiverStream<Result<HelloReply, Status>>;
async fn say_hello(&self, request: Request<HelloRequest>,) -> Result<Response<Self::SayHelloStream>, Status> {
let (tx, rx) = mpsc::channel(4);
let name = request.into_inner().name;
tokio::spawn(async move {
loop {
tx.send(Ok(HelloReply { message: format!("Hello {}", name.clone()) })).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let greeter = MyGreeter::default();
let (tx, mut rx) = mpsc::channel::<()>(1);
let handler = tokio::spawn(async move {
Server::builder()
.add_service(GreeterServer::new(greeter))
.serve_with_shutdown(addr, rx.recv().map(|_|()))
.await
});
tokio::time::sleep(Duration::from_secs(5)).await;
println!("Shutting down server");
tx.send(()).await.unwrap();
let _ = handler.await;
println!("Finished");
Ok(())
}
The data is generated by a fixed interval in a busy loop that runs in a separated thread.
loop {
tx.send(Ok(HelloReply { message: format!("Hello {}", name.clone()) })).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
The code works smoothly if no client connects.
However, when the server is serving its client, the shutdown signal fails its duty and the server gets stuck in a hanging state.
I wonder if there is an event that can be tracked when the server is running into shutdown. Upon that, the busy loop can capture that and quit immediately.