Futures/Tokio: beginner and toy multiplexing reimplementation


#1

Hi!
For my first project in Rust I’ve decided to work on a toy rpc library. I’m implementing a multiplex handler that can be used for client and server.

Since I’m quiete new to Rust and furthmore to futures/tokio sphere, I was quiete uncertain about how would it be a good implementation or no. What do you think of the following strategy?

.

The differents points about this design:

  • one struct Multiplex acting as transport & one MultiplexFrame trait impl in order to handle incoming frames;
  • frame id management is done at the MultiplexFrame level;
  • it that allows us to have a client/server agnostic multiplex that can be used as a transport itself;
  • sharing transport using SharedQueue instance instead of a impl of the Clone trait on Multiplex + inner Rc<_> avoids this Rc boilerplate when not necessary;

The code:

//...

type RequestId = u64;

///! Trait implemented for multiplex frames.
pub trait MultiplexFrame {
    // Return Frame's id
    fn get_id(&self) -> RequestId;

    // Return True if the frame is a request
    fn is_request(&self) -> bool;

    // Return True if the frame is a response.
    fn is_response(&self) -> bool;
}

/// Multiplex structure: implements stream and sink:
/// - stream: poll incoming requests (not answering to an emitted one
/// - sink: output message to owned transport
/// An extra method request() can be used to fire requests and return a
/// futures upon completion of the request (when a response returned)
pub struct Multiplex<T>
    where T: 'static + Stream + Sink,
          T::Item: MultiplexFrame,
          T::SinkItem: MultiplexFrame
{
    transport: T,
    in_flights: HashMap<RequestId, oneshot::Sender<T::Item>>,
}


impl<T> Multiplex<T>
    where T: 'static + Stream + Sink,
          T::Item: MultiplexFrame,
          T::SinkItem: MultiplexFrame
{
    pub fn new(transport: T) -> Multiplex_<T> {
        //...
    }

    ///! Send a request and call given handler when it comes back
    pub fn request(&mut self, item: T::SinkItem)
        -> oneshot::Receiver<T::Item>
    {
        //... return future
    }
}

impl<T> Stream for Multiplex<T>
    where T: 'static + Stream + Sink,
          T::Item: MultiplexFrame,
          T::SinkItem: MultiplexFrame
{
    type Item = T::Item;
    type Error = T::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>
    {
        // poll items from self.transport as Stream
        // - on responses: send item to held in_flights future
        // - on request: return from poll() with the item 
    }
}

impl<T> Sink for Multiplex<T>
    where T: 'static + Stream + Sink,
          T::Item: MultiplexFrame,
          T::SinkItem: MultiplexFrame
{
    type SinkItem = T::SinkItem;
    type SinkError = T::SinkError;

    fn start_send(&mut self, item: Self::SinkItem)
        -> StartSend<Self::SinkItem, Self::SinkError>
    {
        // we don't keep trace of sent messages from here
        self.transport.start_send(item)
    }

    fn poll_complete(&mut self) -> Poll<(), Self::SinkError>
    {
        self.transport.poll_complete()
    }
}

Since futures consumes themselves really easilly, I’ve also implemented a transport that can be shared among multiple instance, such as:

// CalcFrame:
// - struct { id: RequestId, req: bool, value: u64 }
// - `req` is true if message is a request;
// - implements MultiplexFrame

let server = SharedTransport::new(Multiplex::new(transport));

// setup server pipeline: we clone instance of SharedTransport<_> in order
// to still be able to use the server to broadcast frames.
let (w,r) = server.clone().split();
let pipe = r.and_then(|item| {
    Ok(
        CalcFrame {
            id: item.id,
            req: !item.req,
            value: item.value + item.value,
        }
    )
});
let pipe = w.send_all(pipe).then(|_| {
    Ok(())
});
handle.spawn(pipe);

// broadcast a value, call request on cloned future
let foo = server.clone().transport.request(CalcFrame{ id: 13, true, 12 });
let foo = foo.then(|resp| {
    println!("response: {}", resp.value);
    Ok(())
});
handle.spawn(foo);

Thanks!