[SOLVED] Invoking a synchronous code inside of an executing future

You can only get a future’s return value there if you wait for it. Otherwise, you need to work with the value in the map block and continue returning the future to the caller. If you must have the result at this point, then you’ll need to block on the future’s result (ie wait).

Yes, you're right!
And this is the case when we can't return the value handle.spawn() without a blocking an event loop. Therefore, I think here I should go an another way, like use CpuPool with some limitations, as @avkonst mentioned before. However, if the CpuPool::spawn_fn() will be called, then necessary to extract the value via wait()... So, how to use the CpuPool correctly here, so that the outside future won't be blocked?

Most of the time, there shouldn't really be a need to call wait() on futures. You handle their results by chaining continuations (via the combinators). In the case of CpuPool, you spawn a function onto it, get a future back, and then you handle its completion the same way as other futures - attach a continuation to that returned future.

If you look at process_request definition, then you will notice that it returns Result<WebSocketHeaders> value instead of a future. And inside of match expression we're trying to get a user identifier from the Redis instance via get_user_id method. Even if that called function would return a future (or it will be just wrapped with a sync code into a task for CpuPool, which is also a future), then we should to "fire" and wait for a some certain result.

It's not an issue to apply map or map_err to the generated future, an issue here to extract the value (no matter what is it, error or a String instance) and return it. And each future should finish with Ok(()) value that means, that you can't return a value or an error from inner scope to the outside caller. And this makes me suffering :confused:

Is process_request part of overall request handling? I’m assuming that’s the case. Then this function should return a future itself, which resolves to WebSocketHeaders. Its caller(s) can then attach their own continuations and they’ll get access to the WebSocketHeaders value (or an error if the future fails). At some point you’ll have enough data resolved via the futures such that you can actually respond to the client by sending a response.

As mentioned above, you can return a future that resolves to the headers. Is there a reason you cannot return a future here?

This is middleware code that applies before the client will be working the server. If after establishing a WebSocket connection, this middleware applied only once, after a handshake. The goal of this middleware is check the specified token in headers and if it contains any invalid part of data (for example, it was an expired token), then just clone this connection and don't do anything.

At this stage, I'm waiting for a response as soon as possible from a Redis and validate a token, before doing and processing each request further.

You could look onto the actual code here:

  • Proxy, where we have already a tokio::reactor::Core
  • JWT Middleware that applies after establishing a connection (in actual time it wasn't changed yet)

Ok, I think the approach is the same - your middleware should defer further handling of the request until the Redis future resolves. It defers by virtue of the future not executing until its result is ready. If you wait on the future, then you'll just block the event loop if the future isn't ready, preventing handling of other futures that may have completed.

1 Like

Ok, at least we came to the fact that a separate CpuPool will be a good solution here. But raised a couple of questions:

  1. If I will specify a CPU pool with 5 workers, for example, does it mean that we have 5 workers, so that they could do their work concurrently, when wait() method was invoked?
  2. Applying wait() onto CPU pool will block only the CpuPool instance, but not tokio::reactor::Core which is used for handling connections?

Hmm, I'm not sure I said that specifically :slight_smile:. Does the redis lib not use tokio? The CpuPool is a decent alternative for:

  • I/O code/libs that don't expose async functionality
  • Offloading CPU-heavy work

If the redis lib you're using supports tokio then you shouldn't need CpuPool.

When wait is called on what thread? If you call it on the reactor thread, then you'll block the event loop - the threads in the CpuPool can execute in parallel. If you call wait from the pooled thread then you'll block that thread. If you end up blocking all pooled threads then you can starve the system. 5 workers is too little if you intend to do blocking I/O from them.

I think the above answers this?

If the redis lib you’re using supports tokio then you shouldn’t need CpuPool.

For example, redis-async is supporting Tokio, but for the case when necessary to avoid blocking the main loop, I should create a new instance of a Core. So, for this certain client library it will the separate tokio::reactor::Core instance, which will be shared and per each of those future necessary to call wait() for getting a value. I'm not sure that is good for a performance, if wait() will be invoked for the main loop or the separate (or am I probably mistaken, @avkonst ?).

When wait is called on what thread?

I mean the situation when get_user_id will be synchronous, represents a wrapper around libraries like redis-rs with CpuPool.spawn_fn(...) inside and extract a value with a help of wait() method. This function is invoking inside of existing futures as it was mentioned before.

So, If I will write the code like this:

let pool = CpuPool::new_num_cpus();

let prime = pool.spawn_fn(|| {
    Ok(is_prime(BIG_PRIME))
});

println!("{:?}", prime.wait());

It will block only the CpuPool's thread and don't touch the main event loop, right?

A tokio Core is an event loop (aka reactor). It’s a specific type of a reactor that is internally driven by the mio library. Mio, in turn, is an abstraction over various OS’s notification mechanisms. This notification mechanism is mostly for asynchronous socket I/O. All that to say it’s not a general purpose background thread or threadpool.

If you’re waiting on futures then you’re probably doing it wrong :slight_smile:. As mentioned before, there shouldn’t be a need to wait on them. Use continuations to do work once the future resolves (ie completes).

No, it’ll block the thread you’re spawning futures from, not the thread in the pool. If you were offloading to a CpuPool, you’d use the same mechanism to handle the future completion - tack on a continuation. If you spawn a future onto a CpuPool and then wait on it inside a future running on the Core, you’ll block the Core. Don’t do that.

It will block the thread calling wait(). If you run this code within the event reactors context, it will block the core. Possibly indefinitely, as the waiting future blocks and cannot be polled again.

I've got a quick example for backing a tokio core with a thread pool here. Obviously, quite a bit of the management around core and pool handles is band-aid and should be solved in a different fashion.

@skade @vitalyd, let me explain why I can't to return a future from deeps of code and work with it via .map()/.map_err() methods:

  1. In actual moment using tokio-tugnstenite for handling connections and processing requests from clients. If necessary to processing headers, that I should to use the accept_hdr_async method, where as the second argument should be specified as a struct with implemented Callback trait per each struct that you're going to use later. Or... it could be a expression like this, but nonetheless it must return the Result<Option<Vec<(String, String)>>> typo:
let auth_middleware_callback = |request: &Request| {
    let handle_inner = handle.clone();
    let auth_middleware_inner = self.auth_middleware.clone();
    let processing_result = auth_middleware_inner.borrow().process_request(request, &handle_inner);

    match processing_result {
        Ok(headers) => Ok(headers),
        Err(err) => {
            let formatted_error = format!("{}", err);
            let message = Cow::from(formatted_error);
            Err(TungsteniteError::Protocol(message))
        }
    }
};

accept_hdr_async(stream, auth_middleware_callback)
  1. If you want to use those headers inside of the main processing function (per each request), then necessary to copy them from a callback to a variable outside of the scope of this callback. Probably it isn't a good a way to do so, even when you have a reasoned answer.

The next thing, that I'd like to mentioned, is chaining all of those potential futures together, inside the main part of processing. I have the following piece of code, which is works good without an additional changes with Redis/Kafka futures:

let server = socket.incoming().for_each(|(stream, addr)| {
    let engine_inner = self.engine.clone();
    let connections_inner = self.connections.clone();
    let handle_inner = handle.clone();

    let auth_middleware_callback = |request: &Request| {
        let handle_inner = handle.clone();
        let auth_middleware_inner = self.auth_middleware.clone();
        let processing_result = auth_middleware_inner.borrow().process_request(request, &handle_inner);

        match processing_result {
            Ok(headers) => Ok(headers),
            Err(err) => {
                let formatted_error = format!("{}", err);
                let message = Cow::from(formatted_error);
                Err(TungsteniteError::Protocol(message))
            }
        }
    };

    accept_hdr_async(stream, auth_middleware_callback)
        .and_then(move |ws_stream| {
            let (tx, rx) = mpsc::unbounded();
            connections_inner.borrow_mut().insert(addr, tx);

            // with the reading and writing halves separately.
            let (sink, stream) = ws_stream.split();

            // Read and process each message
            let connections = connections_inner.clone();
            let ws_reader = stream.for_each(move |message: Message| {
                engine_inner.borrow().handle(&message, &addr, &connections);
                Ok(())
            });

            // Write back prepared responses
            let ws_writer = rx.fold(sink, |mut sink, msg| {
                sink.start_send(msg).unwrap();
                Ok(sink)
            });

            // Wait for either half to be done to tear down the other
            let connection = ws_reader.map(|_| ()).map_err(|_| ())
                                      .select(ws_writer.map(|_| ()).map_err(|_| ()));

            handle_inner.spawn(connection.then(move |_| {
                connections_inner.borrow_mut().remove(&addr);
                println!("Connection {} closed.", addr);
                Ok(())
            }));

            Ok(())
        })
        .or_else(|err| {
            println!("{}", err.description());
            Ok(())
        })
});

At this case inside of ws_reader future we're processing each message that was sent by a client, so that inside of engine.handle(...) call doing some work (deserializing JSON, validate it, etc.) and returning responses via writing it into tx part of a channel.

As far as I understood, even if I will change the following codebase (don't worry, I'm working on it alone, it possible) so that the copied headers will be passed into the main part of processing requests, we have got inside of ws_reader (right?) a nested future, that must be resolved somehow.

Franky speaking, I have not idea how it should looks like. May be like the following code (correct me, please, if I made somewhere mistakes):

accept_hdr_async(stream, auth_middleware_callback)
    // Process the messages
    .and_then(move |ws_stream| {
        let (tx, rx) = mpsc::unbounded();
        connections_inner.borrow_mut().insert(addr, tx);

        let (sink, stream) = ws_stream.split();

        // Headers is copied to the outside scope but will passed into for this future.
        // If token is invalid or expired, then send the error message and close the connection.
        let redis_future = get_redis_future(&headers)
            .map_err(move |err| {
                let formatted_error = format!("{}", err);
                let error_message = engine_inner.wrap_an_error(formatted_error.as_str());
                tx.unbounded_send(error_message).unwrap();
                Err(tungstenite::Error::Protocol(String::from("Invalid token"))
            })
            .map(|_| ());        

        // The main processing loop, we should execute it ONLY when was 
        // passed step with Redis and token validation. Otherwise this code should 
        // not be fired

        // Read and process each message
        let connections = connections_inner.clone();
        let ws_reader = stream.for_each(move |message: Message| {
            let message: tungstenite::Message = engine_inner.borrow().handle(&message, &addr, &connections);
            // Not sure about this concrete piece of code, because is one of the possible versions
            let kafka_client_future = get_kafka_client()
                // publish the message
                .map(|| {
                    kafka_some_api.write(message);
                    kafka_some_api.subscribe(...)
                })
                // wait here for response in JSON
                .map(|response_message| {
                    let response = engine.get_response_message(response_message);
                    tx.unbounded_send(response).unwrap();
                });
            handle.spawn(kafka_client_future);
            Ok(())
        });

        // Write back prepared responses
        let ws_writer = rx.fold(sink, |mut sink, msg| {
            sink.start_send(msg).unwrap();
            Ok(sink)
        });

        // Here we should resolve the futures, so that redis_future is fired only once,
        // after establishing a connection. If everything is good, then doing some
        // useful job.

        // Wait for either half to be done to tear down the other
        let connection = ws_reader.map(|_| ()).map_err(|_| ())
                                  .select(ws_writer.map(|_| ()).map_err(|_| ()));

        let final_future = redis_future.select(connection);

        handle_inner.spawn(final_future.then(move |_| {
            connections_inner.borrow_mut().remove(&addr);
            println!("Connection {} closed.", addr);
            Ok(())
        }));

        Ok(())
    })
    .or_else(|err| {
        println!("{}", err.description());
        Ok(())
    })

When writing async code with futures, you should not have needs to invoke spawn and wait functions . A thread should be dealing only with futures or only blocking calls, but not simultaneously. Async code can invoke blocking calls if there are compensating threads for event processing threads (not available in rust). Also async code can offload blocking calls to a pool of worker threads and receive results back asynchronously. Sync code can invoke async code and make it synchronous again by waiting for async result in a blocking fashion. These are fundamental things which should guide your design. So have a look where you violate to these principles.

The way I’d try to do this is by adding a future between accept_hdr_async and the and_then that you have now:

...accept_hdr_async(...)
   .and_then(<new code here>)
   .and_then(<your existing code>

This future would do the Redis lookup using the headers you captured (it’s not clear to me how you can do this on a per connection basis but I’m assuming it’s doable). The future would resolve to the websocketstream (or whatever accept_hdr_async returns) by forwarding the value that accept_hdr_async gave it or an error. This way your current logic in the and_then block doesn’t run until the Redis part is done, and the Redis part is done asynchronously.

The idea how to deal with a situation around, when we aren't using a secured mode is pretty simple: a future which is doing nothing around token and headers (copied from my codebase as is):

pub struct EmptyMiddleware;

impl EmptyMiddleware {
    pub fn new(_cli: &CliOptions) -> EmptyMiddleware {
        EmptyMiddleware {}
    }
}

impl Middleware for EmptyMiddleware {
    fn process_request(&self, _headers: &Headers, _handle: &Handle) -> MiddlewareFuture {
        Box::new( lazy(move || { Ok(()) }) )
    }
}

So, in the moment when we're going to use Redis, just replace it onto an another middleware (any that you wish), but has the Middleware trait implementation. Probably this idea is close to "Null Object" pattern.

Presumably you can set up the entire future chain differently when secure mode is disabled.

Agreed about wait(), not so much about spawn(): if one spawns a future on the same event loop which is handling the futures chain of an active task, that future becomes just another task handled by the event loop. Synchronization between the original task and the spawned one is not automatic and must be set up explicitly, if desired.

I have two examples of spawning subtasks in my LDAP library: issuing an Abandon for a Search, polled for explicitly inside the polling loop of the search stream, and handling a paged Search, a kind of tail-recursive chain of Search operations, which reuses the original stream and its polling loop.

2 Likes

Thanks.

FWIW, tokio adde support for blocking:

1 Like