Sending values to futures::sync::mpsc::Sender from another thread

#1

I’m implementing a rpc service with grpcio (futures ^0.1.15).

As part of the service I would like to stream events to the client. Conceptually I would like to handle this with a standard channel. The server would hold the Sending end, and send events until the client drops the receiver.

I couldn’t get things working with a standard channel. Is there some way to bridge a standard channel receiver to a futures::sync::mpsc::Receiver?

Anyway I have something working by cobbling together what I’ve found in futures::sync::mpsc. I’d like to know how to improve the code, and in particular how best to send values into the futures::sync::mpsc::channel.

Here’s my code:

use futures::prelude::*;
use futures::sync::mpsc::*;
use grpcio::{RpcContext, ServerStreamingSink, WriteFlags};
use log::{error, info};
use std::thread;

use crate::proto::workspace::{Event, StreamEventsRequest};
use crate::proto::workspace_grpc;

#[derive(Clone)]
pub struct EventsService {}

impl workspace_grpc::EventsService for EventsService {
	fn stream_events(
		&mut self,
		ctx: RpcContext,
		_req: StreamEventsRequest,
		sink: ServerStreamingSink<Event>,
	) {

		// 1. Create a channel
		let (sender, receiver) = channel(10);

		// 2. Forward channel receiver events to sink to client
		ctx.spawn(
			receiver
				.map(|e| (e, WriteFlags::default()))
				.forward(sink.sink_map_err(|e| error!("sink error: {:?}", e)))
				.map(|_| ())
		);

		// 3. Start a server process to send events for a while. The thread is used
		//    to simulate what I really want to do: Save the `Sender` in the
		//    `EventsService` struct, and when an event happens on the server use
		//    that sender to notifiy the client.
		thread::spawn(move || {
			for i in 0..3 {
				let mut event = Event::new();
				event.set_name(format!("{}", i));
				thread::sleep(::std::time::Duration::from_millis(1000));
				if sender.clone().send(event).wait().is_err() {
					return
				}
			}
		});
	}
}

I’m interested in any improvements.

I’m particularly interested in step (3) in the above code. Right now to send I need to clone the sender each time. And wait on result of the send to drive the future to completing. Do I really need/want to be doing that for each send? Is there a way that I can send without needing to block?