I'm implementing a rpc
service with grpcio (futures ^0.1.15).
As part of the service I would like to stream events to the client. Conceptually I would like to handle this with a standard channel. The server would hold the Sending end, and send events until the client drops the receiver.
I couldn't get things working with a standard channel. Is there some way to bridge a standard channel receiver to a futures::sync::mpsc::Receiver
?
Anyway I have something working by cobbling together what I've found in futures::sync::mpsc
. I'd like to know how to improve the code, and in particular how best to send values into the futures::sync::mpsc::channel
.
Here's my code:
use futures::prelude::*;
use futures::sync::mpsc::*;
use grpcio::{RpcContext, ServerStreamingSink, WriteFlags};
use log::{error, info};
use std::thread;
use crate::proto::workspace::{Event, StreamEventsRequest};
use crate::proto::workspace_grpc;
#[derive(Clone)]
pub struct EventsService {}
impl workspace_grpc::EventsService for EventsService {
fn stream_events(
&mut self,
ctx: RpcContext,
_req: StreamEventsRequest,
sink: ServerStreamingSink<Event>,
) {
// 1. Create a channel
let (sender, receiver) = channel(10);
// 2. Forward channel receiver events to sink to client
ctx.spawn(
receiver
.map(|e| (e, WriteFlags::default()))
.forward(sink.sink_map_err(|e| error!("sink error: {:?}", e)))
.map(|_| ())
);
// 3. Start a server process to send events for a while. The thread is used
// to simulate what I really want to do: Save the `Sender` in the
// `EventsService` struct, and when an event happens on the server use
// that sender to notifiy the client.
thread::spawn(move || {
for i in 0..3 {
let mut event = Event::new();
event.set_name(format!("{}", i));
thread::sleep(::std::time::Duration::from_millis(1000));
if sender.clone().send(event).wait().is_err() {
return
}
}
});
}
}
I'm interested in any improvements.
I'm particularly interested in step (3) in the above code. Right now to send I need to clone the sender each time. And wait on result of the send to drive the future to completing. Do I really need/want to be doing that for each send? Is there a way that I can send without needing to block?