Hi!
For my first project in Rust I've decided to work on a toy rpc library. I'm implementing a multiplex handler that can be used for client and server.
Since I'm quiete new to Rust and furthmore to futures/tokio sphere, I was quiete uncertain about how would it be a good implementation or no. What do you think of the following strategy?
.
The differents points about this design:
- one struct Multiplex acting as transport & one MultiplexFrame trait impl in order to handle incoming frames;
- frame id management is done at the MultiplexFrame level;
- it that allows us to have a client/server agnostic multiplex that can be used as a transport itself;
- sharing transport using
SharedQueue
instance instead of a impl of the Clone trait onMultiplex
+ inner Rc<_> avoids this Rc boilerplate when not necessary;
The code:
//...
type RequestId = u64;
///! Trait implemented for multiplex frames.
pub trait MultiplexFrame {
// Return Frame's id
fn get_id(&self) -> RequestId;
// Return True if the frame is a request
fn is_request(&self) -> bool;
// Return True if the frame is a response.
fn is_response(&self) -> bool;
}
/// Multiplex structure: implements stream and sink:
/// - stream: poll incoming requests (not answering to an emitted one
/// - sink: output message to owned transport
/// An extra method request() can be used to fire requests and return a
/// futures upon completion of the request (when a response returned)
pub struct Multiplex<T>
where T: 'static + Stream + Sink,
T::Item: MultiplexFrame,
T::SinkItem: MultiplexFrame
{
transport: T,
in_flights: HashMap<RequestId, oneshot::Sender<T::Item>>,
}
impl<T> Multiplex<T>
where T: 'static + Stream + Sink,
T::Item: MultiplexFrame,
T::SinkItem: MultiplexFrame
{
pub fn new(transport: T) -> Multiplex_<T> {
//...
}
///! Send a request and call given handler when it comes back
pub fn request(&mut self, item: T::SinkItem)
-> oneshot::Receiver<T::Item>
{
//... return future
}
}
impl<T> Stream for Multiplex<T>
where T: 'static + Stream + Sink,
T::Item: MultiplexFrame,
T::SinkItem: MultiplexFrame
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>
{
// poll items from self.transport as Stream
// - on responses: send item to held in_flights future
// - on request: return from poll() with the item
}
}
impl<T> Sink for Multiplex<T>
where T: 'static + Stream + Sink,
T::Item: MultiplexFrame,
T::SinkItem: MultiplexFrame
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: Self::SinkItem)
-> StartSend<Self::SinkItem, Self::SinkError>
{
// we don't keep trace of sent messages from here
self.transport.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError>
{
self.transport.poll_complete()
}
}
Since futures consumes themselves really easilly, I've also implemented a transport that can be shared among multiple instance, such as:
// CalcFrame:
// - struct { id: RequestId, req: bool, value: u64 }
// - `req` is true if message is a request;
// - implements MultiplexFrame
let server = SharedTransport::new(Multiplex::new(transport));
// setup server pipeline: we clone instance of SharedTransport<_> in order
// to still be able to use the server to broadcast frames.
let (w,r) = server.clone().split();
let pipe = r.and_then(|item| {
Ok(
CalcFrame {
id: item.id,
req: !item.req,
value: item.value + item.value,
}
)
});
let pipe = w.send_all(pipe).then(|_| {
Ok(())
});
handle.spawn(pipe);
// broadcast a value, call request on cloned future
let foo = server.clone().transport.request(CalcFrame{ id: 13, true, 12 });
let foo = foo.then(|resp| {
println!("response: {}", resp.value);
Ok(())
});
handle.spawn(foo);
Thanks!