Help needed in choosing concurrency tools to solve a design problem


#1

I am implementing a SignalR client library in Rust. A typical workflow between a client and the SignalR server is like:

  1. client sends ‘negotiate’ request and the server responds with a session context.
  2. client then sends ‘connect’ request and the server starts ‘streaming’ events to the client; in other words, subscribe.
  3. client can then invoke a method on the server passing relevant arguments. The server treats this as an event and sends the method name, args as stream data to all the subscribers.

All the above requests are REST.
I am using hyper to achieve this.

The code for step 2 looks like:

fn get_stream(&mut self, url: &str, headers: OptionalRawHeaders, transmitter: Sender<Vec<u8>>) {
        let mut request = Request::new(Method::Get, url.parse().unwrap());
        if headers.is_some() {
            for (k, v) in headers.unwrap() {
                request.headers_mut().set_raw(k, v);
            }
        }

        thread::spawn(move || {
            let mut core = Core::new().unwrap();
            let client = Client::new(&core.handle());
            let work = client.request(request).and_then(|res| {
                res.body().for_each(|chunk| {
                    transmitter.send(chunk.to_vec()).expect("Sender error: ");
                    future::ok::<_, _>(())
                })
            });
            core.run(work);
        });
}

The reason there’s a thread spawned is because the main thread blocks as the server sends a continuous stream of bytes (events like method calls made by other clients).
I have used a channel to transmit these events because i need to invoke callbacks that are attached by the user of the client library for method invocations. For example, the user is interested in keeping track of a stock price change and want to do something with it.

The receiver end of this channel is also passed to another thread to process the events:

fn start_transport(&mut self) -> JoinHandle<()> {
        //...get arguments to call start() below
        let (tx, rx) = channel();
        let response = self.client_transport
            .as_mut()
            .unwrap()
            .start( //..this will eventually call the above get_stream() method
                url.as_str(),
                connection_data.as_str(),
                connection_token.as_str(),
                protocol.as_str(),
                Some(tx),
            )
            .map(|r| r)
            .wait()
            .unwrap();

        let pm = self.proxies_map.clone();

        thread::spawn(move || {
            loop {
                let vec = rx.recv().unwrap();
                //...extract hub, method, args info from vec and invoke callback  -
                proxy
                    .lock()
                    .unwrap()
                    .handle_message(method, args.as_array().unwrap().clone());
            } //loop ends
        })
}

Again, a thread is spawned because the loop will block the main thread and therefore, the user would not be able to invoke any other methods in the library which can be, for example, perform step 3 in the aforementioned workflow - invoke a server method.
This is where the problem appears.
A user can do something like:

...
let mut connection = HubConnectionBuilder::new("http://localhost:8080/");
let mut proxy = connection.create_hub_proxy("MyStockHub");
//Attach a callback for set_stock_price method invocation:
proxy.lock().unwrap().on_1_arg::<i32>(
            String::from("set_stock_price"),
            Box::new(|price| println!("Hmm stock price is {}", price)),
        );
//Initiate steps 1 and 2:
connection.start().wait();

//Optional - Invoke a server method:
proxy.lock().unwrap().invoke(
                String::from("set_stock_price"),
                vec![&100],
                &mut connection,
            )
            .wait()
            .unwrap();

If the method invocation in the last line above happens before the sender thread are scheduled, then the callback might not get called.

My questions then, are:

  1. Have i chosen the right approach in dealing with stream?
  2. How do i make ensure that streaming is initiated before any method invocations are made?
  3. Are there any other libraries like hyper that can be used?

#2

There’s a lot of code to work through in your post, so I’m not sure about your exact situation. In general, you’d create different types that can do different things. If you need to wait on something specific before you can do something else, you’d have the wait function with a return type that allows you to call the next thing.


#3

What i want to do is make:

connection.start().wait();

wait until streaming has begun, which is done in a thread:

thread::spawn(move || {
            let mut core = Core::new().unwrap();
            let client = Client::new(&core.handle());
            let work = client.request(request).and_then(|res| {
                res.body().for_each(|chunk| {
                    transmitter.send(chunk.to_vec()).expect("Sender error: ");
                    future::ok::<_, _>(())
                })
            });
            core.run(work);
        });

Since the above thread cannot be deterministically scheduled, can i use something instead of a thread to start streaming?