Designing an event Dispatcher which can lookup promises and resolve them?

Hi there,

I am trying to write a library which will use an API over websockets. My library will send requests like,

[<request-type>,<request-id>,<payload>]

and the other side will respond with,

[<response-type>,<request-id>,<payload>]

The responses can come in any order independent of the order in which I sent the requests. I am having trouble designing something like this. My idea so far is,

  1. Get the reader part from websocket client, start another thread and read messages from it in a loop.
  2. When a request comes in,
    a. It should generate a random ID
    b. Use this ID in the request and send it over websocket,
    c. Create a Future<Item=Result<Response, Error>>. Save a reference to this future in a, HashMap<RandomId, Future<Item=Result<Response, Error>>.
    d. Return this future
    This future will be .awaited in the caller.
  3. When we receive a message in #1, It should look up the future in the hashmap using the <request-id> in response and "resolve" this future such that the caller gets the output.

I don't know if this will work at all? I have not built some thing like this before so I am not really sure how do I do this?

bump

I have created a dispatcher like this,

pub struct Dispatcher {
    ws_reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,

    // Hashmap to store futures?
    map: Arc<RwLock<HashMap<String, ()>>>,
}

impl Dispatcher {
    pub fn new(
        ws_reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
        map: Arc<RwLock<HashMap<String, ()>>>,
    ) -> Self {
        Self { ws_reader, map }
    }

    pub async fn run(&mut self) {
        while let Some(v) = self.ws_reader.next().await {
            let message = parser::decode_message(&v.unwrap().to_string());

            let map = self.map.write().unwrap();
            todo!()
        }
    }
}

And The code that uses this dispatcher looks like this,


pub struct Communicator {
    // ws_sender: Box<dyn NetworkStream + Send>,
    transaction_queue: VecDeque<()>,
    // TODO(ishan): Needs a executor and a spawner
    map: Arc<RwLock<HashMap<String, ()>>>,
}

impl Communicator {
    pub async fn connect(uri: &str) -> Self {
        let (ws_client, _) = connect_async(req).await.unwrap();

        let (mut writer, mut reader) = ws_client.split();

        let mut executor = Dispatcher::new(reader, map.clone());

        tokio::spawn(executor.run());

        Self {
            executor,
            map,
            transaction_queue: VecDeque::new(),
        }
    }

    pub fn disconnect(self) -> Result<(), IoError> {
        // self.ws_sender.shutdown_all()
        // Shut down the executor

        Ok(())
    }

    pub async fn send_call(&mut self, req: Request) -> Result<(), &'static str> {

        self.executor.spawn_with_handle()

        todo!()
    }
}

Right now, I am not sure how do I run the executor in another thread and implement/call a .spawn_with_handle method on it.

Are those requests sent as a single websocket message, or three?

My first comment is that I would never put a future in a hash map. Put something like an oneshot channel instead. My second comment is that I would use the actor pattern for this.

Once I have the answer to the first question, I can give more details on how I would approach solving this.

I've a library that works like this. You can look at nvim-rs/neovim.rs at master · KillTheMule/nvim-rs · GitHub (every message sent is a request btw) how I solved that. Probably going to study @alice's actor pattern though, maybe that works better :wink:

1 Like

Are those requests sent as a single websocket message, or three?

Each request and response is sent in a single websocket message.

While searching for ways to do this, I did find this thread. N00b question - what is the Rust'ish way of programming serious event-driven programs? - #3 by ahunter

In that thread, People also suggested using Actor Pattern. I don't yet know it well enough but I will try it out.

Also, Just to reiterate/clarify the project I am trying to build.

I can send queries to this websocket API and it'll respond in arbitrary order AND the other peer can also send events. My program will have to parse these events and trigger some callbacks based on the type of event.

I think reading message in a loop and calling associated callbacks for some types of message is not super difficult but I couldn't figure out a way to respond to the queries.

The reference implementation of the protocol is written in Kotlin. It has a Promise Repository(Hashmap of CompletableFuture) and solves the problem like that.

This oneshot channel approach looks quite good. Thank you for sharing the code.

(Since you responded so quickly: I have to leave the computer now, but I can respond when I get back.)

1 Like

Here's how I would design something like this. This implementation has some mpsc and oneshot channels that it uses to communicate with the rest of the program.

The basic idea is that this piece of code has only a single responsibility: route messages between the websocket itself and the rest of the program, keeping track of where each response needs to go.

Note that I would be wary of adding more complexity to this piece of code. It really is intended to just do these things and these things only. If you need to do more, I would strongly consider putting it somewhere outside the actor.

use futures::stream::{StreamExt, SplitStream, SplitSink};
use std::collections::HashMap;
use std::pin::Pin;
use tokio::net::TcpStream;
use tokio::sync::{oneshot, mpsc};
use tokio_tungstenite::{WebSocketStream, MaybeTlsStream};

struct Request {
    id: String,
    // .. extra data goes here
}
struct Response {
    id: String,
    // .. extra data goes here
}

impl Request {
    fn into_websocket(self) -> tungstenite::Message {
        todo!()
    }
}

impl Response {
    fn from_websocket(msg: tungstenite::Message) -> anyhow::Result<Self> {
        todo!()
    }
}

struct ActorState {
    reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
    writer: WebsocketWriter,
    pending_operations: HashMap<String, oneshot::Sender<Response>>,
    incoming_operations: mpsc::Receiver<(Request, oneshot::Sender<Response>)>,
    unknown_events: mpsc::Sender<Response>,
}

/// Run the actor until something goes wrong.
async fn run_actor(mut state: ActorState) -> anyhow::Result<()> {
    loop {
        let accept_incoming = state.writer.to_write.is_none();

        tokio::select! {
            message = state.reader.next() => {
                match message {
                    Some(Ok(msg)) => {
                        let response = Response::from_websocket(msg)?;
                        if let Some(sender) = state.pending_operations.remove(&response.id) {
                            // Don't care about failures to send to oneshot.
                            let _ = sender.send(response);
                        } else {
                            let _ = state.unknown_events.send(response);
                        }
                    },
                    // Connection failed with error.
                    Some(Err(err)) => return Err(err.into()),
                    // Connection was closed normally.
                    None => return Ok(()),
                }
            },
            new_op = state.incoming_operations.recv(), if accept_incoming => {
                match new_op {
                    Some((new_op, oneshot)) => {
                        state.pending_operations.insert(new_op.id.clone(), oneshot);
                        state.writer.to_write = Some(new_op.into_websocket())
                    },
                    // The incoming_operations channel has been closed.
                    None => return Ok(()),
                }
            },
            result = state.writer.write_next() => {
                if let Err(result) = result {
                    return Err(result.into());
                }
            },
        }
    }
}

/// The SinkExt trait didn't quite have the functions I need here, so
/// I wrote my own functions.
struct WebsocketWriter {
    writer: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
    to_write: Option<tungstenite::Message>,
}

impl WebsocketWriter {
    /// Write the item in to_write to the sink, returning when it is written.
    ///
    /// If there is nothing to write, this method will call flush and then
    /// sleep forever.
    ///
    /// This function is designed to be cancel safe. 
    async fn write_next(&mut self) -> tungstenite::error::Result<()> {
        use futures::sink::{Sink, SinkExt};

        if self.to_write.is_some() {
            let mut writer = Pin::new(&mut self.writer);
            // Wait for the sink to become ready to accept items, then write it.
            //
            // I do it this way instead of going through SinkExt because SinkExt
            // wants us to give it the item before we know the sink is ready,
            // but that could mean that we loose the item if the `tokio::select!`
            // cancels us while it is waiting for the sink to become ready.
            futures::future::poll_fn(|cx| writer.as_mut().poll_ready(cx)).await?;

            // Calling `start_send` has no await, so we can't be cancelled at this
            // point.
            writer.start_send(self.to_write.take().unwrap())?;

            Ok(())
        } else {
            // There's no item to write right now. We do a flush, then
            // just sleep until cancelled by the `tokio::select!`.
            self.writer.flush().await?;
            std::future::pending().await
        }
    }
}
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
tokio-tungstenite = "0.17.1"
tungstenite = "0.17.2"
anyhow = "1"
1 Like