[SOLVED] Invoking a synchronous code inside of an executing future

I'm trying to figure out with tokio / futures crates and have a couple of questions around those libraries:

  • How can I invoke a synchronous code inside of an executing future, so that it won't be blocked and won't slow down an event loop?

    In my case I have an opened WebSocket connection, so that when it was established, just we're checking a token in header before doing something. If the passed token isn't exists in Redis instance or got an error during validation process, then we will close the connection and don't do anything. Otherwise we're going to processing any requests from the certain client further (for example wrap the request, transform it and send to a Kafka instance)

  • Does the tokio / futures crate have some certain approach to solve this issues or it have some "conventions" how to deal with it? For example, I though a lot about using "CpuPool" for wrapping each Redis/Kafka calls into tasks and put it into the separate "CpuPool" instances. But I don't ensure that it will get me a good performance in this case.

1 Like

If I got you right you there are 2 functions/methods on future returning functions. That is wait() and then(). See https://tokio.rs/docs/getting-started/futures/ for examples for both.

Thanks

I have searched for the same. In scala it is supported by blocking context https://docs.scala-lang.org/overviews/core/futures.html#blocking, where thread pool (default implementation) is expanded with 'block' compensating threads. Similar exists in C#. It seems Tokio does not support this.

Note: CPU count thread pool might not be sufficient to handle blocking calls. A good rule is to use a pool with number of threads equal to or more than 'number of simultaneously allowed blocked calls + 1' and limiting it by some constant number like 1024 max threads.

As in documentation mentioned, wait() method will block the thread, until this future is resolved. If I will call it somewhere inside of the executed future (for example, I will pass the tokio::reactor::Handle), then we will have got a performance hit.

Your approach with delegating blocking calls to worker threads and communicating results asynchronously back to futures execution thread is the right option in the given situation.

Yes, you are right. Wait will not help you here. It is for another purpose

If you support the idea of blocking context, I think it is worth creating feature request for this. I have asked about this in the other thread some time ago , there was no much traction with it.

Did you checked "then".

As far as I remember 'then' is a combinator function to build a continuation future which will act on result. I am not aware of any combinators which might be aware about blocking context

1 Like

As mentioned in docs, then() method will return a need future when it was called in your code. Futures is great, but in my case I should create for an another future for using the then()(for example, fire it via using tokio::reactor::Handle and "redis-async" crate which is implemented for using with Tokio). But it will be an asynchronous code. How I should take the result of this future if it will be executed inside of existing future?

If all your dependant calls are async and non blocking, you may safely create a future inside of executing a future and flat-map the result using and_then combinator.

This is a good idea! If it wasn't provided and implemented yet with tokio or futures crates, especially if someone already faced with this issue before.
Do I have, probably, also other ways to solve it? I'm asking about it first of all because I'm trying to implement a reverse proxy (WebSocket-over-Kafka) for my own project, when it will accept the client requests from a client (by WebSocket and messages in JSON), apply certain transformation to a request, pass it to Kafka, wait a some time before will receive a response, and return it as is.

If Kafka client is fully async , see my previous post :slight_smile: I understood you have got blocking calls to do.

Hm, so my understanding is that what is needed here is a smart CpuPool. Current futures_cpupool has only a fixed number of threads, but we want to dynamically add new threads if all current threads are waiting on blocking IO, and remove threads if all current ones are CPU bond.

In theory, this can live as a separate crate futures_dynamic_cpupool. Quick search on crates.io didn't reveal anything similar already written, so it actually might be a very fun project to write!

1 Like

Okay, I could provide here an example of code, which is trying to do the following things. Currently I'm stuck with getting a result from a Redis instance. It looks like this one:

// Here is using the "redis-async" crate
// This function is executing inside of an existing future. Just pass the "handle" as is
fn get_user_id(&self, raw_token: String, handle: &Handle) -> Result<String> {
    let redis_socket_address = self.redis_address.parse().unwrap();
    let redis_connection = paired_connect(&redis_socket_address, handle);

    let get_user_id_future = redis_connection.and_then(move |connection| {
        // Get the User ID from Redis by the token
        connection.send::<String>(resp_array!["GET", raw_token])
    });

    // I should to extract a final result here for understand what is going on
    let mut processing_result;
    handle.spawn(get_user_id_future.then(move |response| {
        processing_result = match response {
            Ok(user_id) => Ok(String::from(user_id)),
            Err(_) => {
                let message = String::from("Token is expired or doesn't exist.");
                Err(PathfinderError::AuthenticationError(message))
            }
        };
        Ok(())
    }));

    processing_result
}

But I have no idea how to make it better. Probably someone will help me here?
But If I will change it to sync version, then it will be good to for me. For of all because I have a lack of an experience with futures and I'd like to make it work more or less stable and without any issues :slight_smile:

This is one way of implementing this. The pool does not have to be smart :slight_smile: a programmer may wrap blocking calls with a marker to tell / allow a pool to grow explicitly if there are queued tasks for execution. This is how scala does it and it works quite well for me. In general blocking calls are undesirable in event driven software but sometimes you need to have to do one due to compatibility or legacy or dependency. Scala provides such a tool and I use it carefully to make sure it is not on a critical path and does not cause starvation

One way to fix your function is to make it result type Box<future<string>> and fulfill the result asynchronously. Instead of handle spawn do 'get_user_id_future.map(...)' and 'get_user_id_future.map_err(...)'

Does it correct changes?

fn get_user_id(&self, raw_token: String, handle: &Handle) -> Box<Future<Item = String, Error = PathfinderError>> {
        let redis_socket_address = self.redis_address.parse().unwrap();
        let redis_connection = paired_connect(&redis_socket_address, handle);

        // Make the authentication before, if a password was specified.
        redis_connection
            .and_then(move |connection| {
                // Get the User ID from Redis by the token
                connection.send::<String>(resp_array!["GET", raw_token])
            })
            .map_err(move |_err| {
                let message = String::from("Token is expired or doesn't exist.");
                Err(PathfinderError::AuthenticationError(message))
            })
            .map(move |user_id| {
                Ok(String::from(user_id))
            })
    }

Yes, I had similar idea. You need to wrap the returned result into a Box. Note: I am not the best person to verify rust code, I am still learning and guided by a compiler myself.

How can I extract a result from a result future in the place where this function is called? Because after getting the boxed future, I'm receiving an error:

error[E0308]: mismatched types
   --> src/token/middleware.rs:107:69
    |
107 |                  let validation_struct = self.get_validation_struct(&user_id);
    |                                                                     ^^^^^^^^ expected str, found struct `futures::Map`
    |
    = note: expected type `&str`
               found type `&futures::Map<futures::MapErr<std::boxed::Box<futures::Future<Item=std::string::String, Error=error::PathfinderError>>, [closure@src/token/middleware.rs:100:30: 102:22]>, [closure@src/token/middleware.rs:103:26: 105:22]>`

Current function state:

fn get_user_id(&self, raw_token: String, handle: &Handle) -> Box<Future<Item=String, Error=PathfinderError> + 'static> {
    let redis_socket_address = self.redis_address.parse().unwrap();
    let redis_connection = paired_connect(&redis_socket_address, handle);

    // Make the authentication before, if a password was specified.
    Box::new(
        redis_connection
            .and_then(move |connection| {
                // Get the User ID from Redis by the token
                connection.send::<String>(resp_array!["GET", raw_token])
            })
            .map_err(move |_err| {
                let message = String::from("Token is expired or doesn't exist.");
                PathfinderError::AuthenticationError(message)
            })
            .map(move |user_id| {
                String::from(user_id)
            })
    )
}

And the place where is it invoked:

fn process_request(&self, request: &Request, handle: &Handle) -> Result<WebSocketHeaders> {
    match request.headers.find_first("Sec-WebSocket-Protocol") {
         Some(raw_token) => {
             // Try to fetch token after handshake
             let extracted_token = self.extract_token_from_header(raw_token)?;

             // Validate the passed token with request
             let user_id = self.get_user_id(extracted_token.clone(), handle)
                .map_err(move |reason| {
                    return Err(reason)
                })
                .map(move |result| {
                    result
                });

             let validation_struct = self.get_validation_struct(&user_id);
             let _token = validate_token(&extracted_token, &self.jwt_secret, &validation_struct)?;

             // Return validated header as is
             let extra_headers = vec![(String::from("Sec-WebSocket-Protocol"), extracted_token)];
             Ok(Some(extra_headers))
         },
         None => {
             let message = String::from("Token was not specified.");
             Err(PathfinderError::AuthenticationError(message))
         }
    }
}