Chaining futures


#6

Handle can be cloned as many times as you want, so use that to your advantage. Make a clone if needed, and store it by value in the closure environment (use move if needed).

There are very few situations where you are able to store a non-'static object inside the closure environment of a Future.


#7

As @Fylwind mentioned, you should clone the Handle and move that clone into a closure. In your case, your first closure ends up moving the handle, thus making it unavailable in your second (commented out) closure.


#8

Right, cloning things was what i stumbled on. I needed references to the same thing in a couple of different places, so I cloned enough copies and used “move” to transfer ownership. Specifically, two copies of handle (hc and hc2), an extra copy of the command string (access_cmd cloned as revoke_cmd), and an extra copy of req.addr as req_addr. Is this the technique? It makes sense, but seems strange.

This is what it looks like now:

    let hc = handle.clone();
    let hc2 = handle.clone();
    let revoke_cmd = format!("{}", access_cmd);
    let req_addr = req.addr.to_string();
    
    let grant_then_revoke = Command::new(access_cmd).arg("start").arg(format!("{}", &req.addr))
        .output_async(&handle)
        .then(move |_| {
            Timeout::new(Duration::from_secs(duration), &hc).unwrap()
        })
        .then(move |_| {
            Command::new(revoke_cmd).arg("stop").arg(req_addr)
                .output_async(&hc2)
        })
        .then(|_| {
            future::ok(())
        });

I think the next refinement is replacing then with and_then in some places because that enables the sequence to stop if the future arrives and the result is a failure. I’ll try to make that work and see what I come up with.


#9

You can probably avoid some of the cloning by returning the state (e.g. req) from the futures in the chain - “pass the puck”, so to speak. Your closures receive arguments, which you currently ignore (via the placeholder ‘_’) but if you make a closure return the state it received, it’ll be available to the next future, and so on. This way you’re moving state in and out of the closures/futures in your chain.


#10

Passing the puck is definitely what I’m after. I’d started to use the closure’s argument in a couple cases. so, for example, I could print the standard output of the command after it executes. As you’ve pointed out, there’s a further use of the closure argument: to pass along state as the computation proceeds.

Using map() seemed like the way to do this: creating a future that carries the state along. This one example eliminates the need for the revoke_cmd variable in the earlier example:

.and_then(move |o| {
            print!("{}", str::from_utf8(&o.stdout).unwrap());
            Timeout::new(Duration::from_secs(duration), &hc).unwrap()
        }).map(|_| { req })
        .then(move |req| {
            Command::new(revoke_cmd).arg("stop").arg(req.unwrap().addr.to_string())
                .output_async(&hc2)
        })

The map() creates a Future that resolves to an instance of my state struct, which is passed to the closure as req. It arrives as a Result (so must be unwrapped) and is then available for use.

I’ll see if I can refine this further and eliminate other clones, which will further clean things up.


#11

I tried to transfer the state data through my futures instead of creating a bunch of clones, but got lost in all the attempts at making that work.

At a high level, the problems seem to be:

  • Some of the data are in the scope of main(), which may not outlive the closure functions. I don’t understand how this is possible, but assuming it is, I don’t know how to say: it’s ok it reference this essentially global data.

  • using a value more than once is tricky. The use of move presents a conundrum: it moves all the variables in a scope into the closure, making them inaccessible to subsequent closures. You might even “inadvertently” move things that you’d want to move later. But if you don’t use move, then there are lifetime-related problems detailed below.

In this program there are 4 futures, an initial one and 3 more chained to the first. Future 1 has access to the data it needs and is not a problem. Futures 2 and 3 are connected with the then or and_then combinators. The closures therein are where the problems lie. The 4th closure needs no data other than the result of the previous future. That’s passed to the closure, so it is not a problem.

This is the data:

  1. access_cmd: A String in scope of main(). Needed twice (1st and 3rd future). Trying to access it in a closure is forbidden because it borrows the value from the main() scope and that may not live long enough. The specific error is borrowed value must be valid for the static lifetime...

  2. o: An Output passed as a closure. needed twice (2nd and 4th futures). This one poses no ownership problem because the combinator passes the value as a closure argument.

  3. duration: A u64, in the scope of main(). Needed once (2nd future). Because it’s used only once, the closure that uses it can simply be a move closure and the value will be consumed. This solution is sufficient because duration is needed nowhere else.

  4. handle: A Handle in the scope of main(). Needed three times (1st, 2nd and 3rd futures). Trying to access it in the closures of the 2nd and 3rd is troublesome. The closure may outlive the current function (main()). Converting the closure to a move closure results in this error message: cannot move out of captured outer variable in an FnMut closure, which I don’t understand. Finally creating a single clone and attempting to use it in two different closures doesn’t work because the second closure attempts to capture the value after it moved into the first closure. The solution here was to create two clones of handle, named hc and hc2 and use them in the two closures than needed it. This seems naive.

  5. req_addr: a String in the scope of the for_each loop. Needed twice (1st and 3rd futures). Using it in the 3rd future’s closure results in a similar problem as handle. req_addr is inadvertently made inaccessible by an earlier closure that moved the data, so isn’t accessible.

So, a host of problems that all seem like they need different solutions. I’m not sure if each one should be solved individually or if there’s a good technique to apply to all of them.


#12

Is it possible to make sure the closures don’t outlive main, such as joining the threads, using scoped threads or by other means?


#13

I’m not familiar with any of those things, so not sure. I went pretty far into the weeds here, so in order to not get further into “XY problem” territory, here’s a restatement:

My tokio-core based application has data that is needed at different stages of a sequence of chained-together futures. Some of it is data from the program’s main function, other data is more local: information synthesized from parameters passed to a closure inside a for_each. The program works now by making various clones of data. But it seems like all that copying is not necessary and that a better understanding of ownership would result in less clutter. I’d like some advice on how to clean things up.


#14

cannot move out of captured outer variable in an FnMut closure

An FnMut closure is a closure that is anticipated to be called multiple times (usually as part of some iteration). Captured variables within the FnMut closure are necessary to call the FnMut object. If you move one of them out to somewhere else, then the FnMut can no longer be called on the next iteration!


#15

OK, well this is the specific code. This code is inside the main function (within a loop). handle is in the scope of main() and is the variable that presumably is the problem.

        .and_then(move |o| {
            print!("{}", str::from_utf8(&o.stdout).unwrap());
            Timeout::new(Duration::from_secs(duration), &handle).unwrap()
        })

This is the cannot move out of captured outer variable in an FnMut closure error

Can you explain what “move out of captured outer variable” means? The message just doesn’t make sense to me.


#17

My guess (based on your earlier post) is that you have something like this:

let handle = /* ... */;
incoming.for_each(move |(addr, req)| {
    let duration = something;
    blahblah.and_then(move |o| {
        print!("{}", str::from_utf8(&o.stdout).unwrap());
        Timeout::new(Duration::from_secs(duration), &handle).unwrap()
    })
})

What the compiler sees is more like (pseudocode):

let handle = /* ... */;
incoming.for_each(__Closure1 { __h1: handle })

struct __Closure1 { __h1: Handle }
impl FnMut for __Closure1 {
    fn call_mut(&mut self, (addr, req)) {
        let duration = something;
        blahblah.and_then(__Closure2 { __h2: self.__h1, __d1: duration })
                 // can't move self.__h1!   ^^^^^^^^^
                 // because self is &mut!
    }
}

struct __Closure2 { __h2: Handle, __d1: Duration }
impl FnOnce for __Closure2 {
    fn call_once(self, o) {
        print!("{}", str::from_utf8(&o.stdout).unwrap());
        Timeout::new(Duration::from_secs(self.__d1), &self.__h2).unwrap()
    }
}

The handle gets moved into the environment (self) of __Closure1. Inside __Closure1, you are asking for the handle to be moved again into __Closure2, but because __Closure1 is FnMut (i.e. it could be called multiple times), you only get &mut access to the environment, so you aren’t allowed to move things out of the environment as that would invalidate the environment for future invocations. Conceptually it’s the same reason you aren’t allowed to write code such as:

let handle: Handle = /* ... */;
loop {
    use_handle(handle); // oops, handle is gone now
                        // what will happen on the next iteration!?
}

The trick is to make a clone of handle within __Closure1 (which I think you’ve already discovered). This clone will be called on each invocation of __Closure1, bypassing the problem.


#18

This would be possible if you capture a reference to data from main(), and the capturing closure is moved to another thread. The current thread may exit before that other thread runs, and therefore the locals you’ve captured a reference to will be gone by then. That’s the gist, and is why you’ll see some functions requiring that a closure passed to them has 'static lifetime bounds.

With Core::run, however, the closure you pass doesn’t need to 'static since that call doesn’t return until the future is resolved. But, you may be using other APIs that require 'static.

By the way, it would be easier if you showed the entire code when talking about a snippet - it makes it easier to see what you’re working with, rather than needing to assume and/or scroll up and down this thread :slight_smile:


#19

There was some subtlety here I was missing out on: The “body” of the for_each() isn’t just a simple loop. It’s a closure that’s doing some captures and moves of its own. Given what you’ve described, the error message makes more sense. To recap:

Implementors of the FnMut trait borrow a mutable reference to the scope’s “environment”, via the &mut self parameter. The individual variables are “captures”, available via self. But attempting to move one of those fields (the 2nd closure’s capture) is a no go. My understanding here is a little fuzzier, but it sounds like the move would transfer ownership of part of the environment elsewhere. Once the “receiving function” finished, the moved value would go out of scope and be removed, essentially making that part of the environment invalid…


#20

Right, sorry about that. I’ll put the code next to the questions.


#21

@Fylwind did a nice job desugaring the closures. To recap, here’s his desugaring:

let handle = /* ... */;
incoming.for_each(__Closure1 { __h1: handle })

struct __Closure1 { __h1: Handle }
impl FnMut for __Closure1 {
    fn call_mut(&mut self, (addr, req)) {
        let duration = something;
        blahblah.and_then(__Closure2 { __h2: self.__h1, __d1: duration })
                 // can't move self.__h1!   ^^^^^^^^^
                 // because self is &mut!
    }
}

struct __Closure2 { __h2: Handle, __d1: Duration }
impl FnOnce for __Closure2 {
    fn call_once(self, o) {
        print!("{}", str::from_utf8(&o.stdout).unwrap());
        Timeout::new(Duration::from_secs(self.__d1), &self.__h2).unwrap()
    }
}

You have nested closures (a closure that creates another closure internally). The outer closure is the FnMut one because it’s invoked multiple times, and therefore needs to stay valid across those multiple invocations. For each invocation of the outer closure, a new inner one is created.

Note how the inner closure takes a Handle by value - that will move it into this inner closure from wherever it’s coming from. In this case, that Handle is coming from the outer closure via self, where self refers to the outer closure:

blahblah.and_then(__Closure2 { __h2: self.__h1, __d1: duration })

If this were allowed, then the outer closure would no longer be valid since its __h1 value is moved into the inner closure, and we couldn’t call the outer closure more than once.


#22

Yes, I think I understand. To clarify, the final disposition of self.__h1 (the handle), were it allowed to move, is that it would be removed at the end of the execution of Closure2. And this is what would make the outer closure’s environment invalid and hence uncallable more than once.


#23

It actually doesn’t matter what happens to __h1 after it’s moved out of the outer closure - the true problem is the outer closure is no longer callable once that happens since its __h1 field is undefined after the move (out).


#24

Ah, I see. Conceptually, it’s the same as:

let x = String::from("hi");
let y = x;

println!("y = {}", y); // works.
println!("x = {}", x); // won't work. Value moved to y, x is invalid now.

#25

Yeah, pretty much.


#26

Ok, here’s the current code. It works but I still wonder if it’s idiomatic.

It’s annotated with comments where the creation or use of a particular variable seems duplicative. The question is: can this be made more compact or expressive? I’m curious about the idea of passing state through the closures, but am stumped on how to do it.

What we have here is the body of main() with some setup stuff not germaine to the discussion elided.

let matches = /* elided: stuff from the Clap argument processor... */

let access_cmd = matches.value_of("CMD").unwrap();
let duration = matches.value_of("duration").unwrap_or("5").parse::<u64>().unwrap();

/* elided: setup of the core, socket and the socket stream */
let handle = core.handle();

let incoming = incoming.for_each(|(addr, req)| {
    let handle_for_timeout = handle.clone(); // clone, used in future 2
    let handle_for_2nd_cmd = handle.clone(); // cloned again, used in future 3
    let revoke_cmd = String::from(access_cmd); // new, used in future 3

    println!("incoming request from {}", addr);
    // 1
    let grant_then_revoke = Command::new(access_cmd).arg("start")
        .arg(req.addr.to_string()) // req.addr, as a string.
        .output_async(&handle)
        // 2
        .and_then(move |o| {
            print!("{}", str::from_utf8(&o.stdout).unwrap());
            Timeout::new(Duration::from_secs(duration), &handle_for_timeout).unwrap()
        })
        // 3
        .then(move |_| {
            Command::new(revoke_cmd).arg("stop")
                .arg(req.addr.to_string()) // req.addr, again, as a string.
                .output_async(&handle_for_2nd_cmd)
        })
        // 4
        .then(|o| {
            print!("{}", str::from_utf8(&o.unwrap().stdout).unwrap());
            future::ok(())
        });

    handle.spawn(grant_then_revoke);
    future::ok(())
});

drop(core.run(incoming));