Futures/streams/grpc-rs: How to cancel a server to client stream?


I’m streaming from server to client using grpc-rs.

The stream is working, but when the client is done and cancels the stream the server never stops sending items into the stream. It keeps sending until the client process is killed. I’m unsure if this is due to a bug in grpc-rs or due to bad use of futures in my own code.

I’m looking ideally for someone who’s been able to cleanly cancel grpc-rs server to client streams. Or if not that then someone who can verify that my futures code looks reasonable.

I’ve created an example demonstrating the problem here:

Even if you don’t have time to look into that example can you verify that this futures code looks reasonable? In particular ItemStream is infinite, and I’m doing send_all. But I assume that the Sink abstraction should be able to error and exit out of that send_all when the sink is closed right?

fn stream_case_1(
    &mut self,
    ctx: RpcContext,
    req: StreamRequest,
    sink: ServerStreamingSink<StreamItem>,
  ) {
    let stream = ItemStream::new()
      .map(|item| (item, WriteFlags::default()))
      .map_err(|_| Error::RpcFailure(RpcStatus::ok()));
        .map(|_| println!("completed"))
        .map_err(|e| println!("failed to reply: {:?}", e)),