Warp, tokio, and futures troubles

I'm playing with warp, and it seems really good, but I feel like I still don't have a full grasp of futures:

I'm trying to write some code that blocks (on database access) using tokio_threadpool::blocking and futures::poll_fn. The code below contains a function that works, and 2 further functions that attempt to extract the combinator logic but fail to compile. Is there a way to do what I'm trying to do? The error is:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
   --> src/bin/server.rs:126:34
    |
123 | fn blocking<T, F>(f: F) -> impl Future<Item=T, Error=Error>
    |                   - captured outer variable
...
126 |     poll_fn(move || blocking_tmp(f))
    |                                  ^ cannot move out of captured outer variable in an `FnMut` closure

error: aborting due to previous error

Here is my code

use failure::Error;

fn process_enquiry(enquiry: Enquiry) -> impl Future<Item=(), Error=Error> {

    poll_fn(move || tokio_threadpool::blocking(|| {
        // My logic
        Ok(())
    }))
    .map_err(Error::from)
    .flatten()
}

// now my attempt at abstraction

fn blocking<T, F>(f: F) -> impl Future<Item=T, Error=Error>
where F: FnOnce() -> Result<T, Error>
{
    poll_fn(move || blocking_inner(f))
        .map_err(Error::from)
        .flatten()
}

fn blocking_inner<T, F>(f: F) -> futures::Poll<Result<T, Error>, Error>
where F: FnOnce() -> Result<T, Error>
{
    tokio_threadpool::blocking(f).map_err(Error::from)
}

Any time you see a "cannot move out of captured outer variable in an ..." error, it's very likely because you need an Fn or FnMut (the message will say, like in this case it's an FnMut), which must be callable multiple times, but the code as written consumes something and therefore doesn't allow the closure to be callable multiple times (or otherwise, using/creating an FnOnce instead).

In this case, blocking_inner takes F by value and so the compiler sees that the captured f in blocking is moved into the closure, which is good, but then it's moved by virtue of calling blocking_inner.

So, if you want to call a closure repeatedly (that's what poll_fn does, essentially), you shouldn't be giving it an FnOnce.

1 Like

I'm sorry I re-read this and it misses some important information, namely that enquiry is used in the inner function (that is FnOnce). So blocking is run once to generate the Poll.

But I don't understand why the compiler doesn't say "I can see "f" can only be called once, but that's ok since blocking_inner will only call it once".

EDIT I see, it's the function itself that can't be moved?

Right, the issue isn't blocking_inner - that one, as written, is fine in isolation. The problem is in blocking, which returns a future that's poll_fn based; poll_fn needs an FnMut, so the closure you give it must be callable multiple times. As such, f in blocking cannot be consumed in any way inside the closure, but blocking_inner does exactly that.

1 Like

I'm not sure there's an obvious solution :cry:

The easiest solution is to do:

fn blocking<T, F>(mut f: F) -> impl Future<Item = T, Error = Error>
where
    F: FnMut() -> Result<T, Error>,
{
    poll_fn(move || blocking_inner(&mut f))
        .map_err(Error::from)
        .flatten()
}

That is, make blocking take an FnMut and then pass a mutable borrow of that to blocking_inner.

Alternatively, stay with FnOnce in blocking but add an additional Clone requirement on F, and do blocking_inner(f.clone()) in the closure. Whether this is workable (or even preferred) depends on whether the closure you expect to use is truly clonable (and you want to clone). But it's a possible option.

How exactly did you want to make use of blocking?

2 Likes

By the way, here is a playground that roughly mirrors your situation - you can use that to demonstrate what you'd like to do. I put the suggested FnMut approach into it for now.

2 Likes

I think this solves my problem! Thankyou!