Chaining futures


#1

I’m implementing a small server based on Tokio’s UDP streams that performs an action asynchronously. I’m having trouble adding additional actions. This seems to be an application of chaining futures together, which I haven’t yet grasped.

Currently, the code receives an object from the stream and does this:

  1. Prints the object out.
  2. Delay for a bit (but go back to listening for incoming packets, so that’s that async part)
  3. When the delay is over, print the object out again.

The code is this:

let b_stream = b_stream.for_each(|(addr, req)| {
    println!("{} start {} (from {})", access_cmd, &req.addr, addr);
    let revoke_cmd = format!("{} stop {}", access_cmd, req.addr);
    let timeout = Timeout::new(Duration::from_secs(duration), &handle).unwrap();
    let cb = timeout.map(move |_| {
        println!("{}", revoke_cmd)
    }).map_err(|err| println!("{:?}", err));
    handle.spawn(cb);
    future::ok(())
});

Question #1: this code works, but is it idiomatic and use correct technique?

Question #2: how would I extend this to be:

  1. Invoke a child process asynchronously.
  2. Once that process is complete, log its output and start the Timeout.
  3. When the Timeout is done invoke another process asynchronously.
  4. When that second process is done, log its output.

In sum, the code inside the loop, would manage the creation of three futures in sequence (the first command, the Timeout, the second command), not just one (the Timeout). The tokio-process crate has the “run a process asynchronously” functionality, so I tried this out as an initial stab:

    let output = Command::new(access_cmd).arg("stop").arg(format!("{}", &req.addr))
        .output_async(&handle);
    handle.spawn(output);

But I got these errors:

error[E0271]: type mismatch resolving `<tokio_process::OutputAsync as futures::Future>::Item == ()`
  --> src/main.rs:76:16
   |
76 |         handle.spawn(output);
   |                ^^^^^ expected struct `std::process::Output`, found ()
   |
   = note: expected type `std::process::Output`
   = note:    found type `()`

error[E0271]: type mismatch resolving `<tokio_process::OutputAsync as futures::Future>::Error == ()`
  --> src/main.rs:76:16
   |
76 |         handle.spawn(output);
   |                ^^^^^ expected struct `std::io::Error`, found ()
   |
   = note: expected type `std::io::Error`
   = note:    found type `()`

Not a very auspicious start. I’m aware that some structs define types, but I don’t know how that comes into play here. The spawn() takes a future, and the output_async() function returns an OutputAsync, which implements the Future trait. I’m missing something here. Beyond the error message, I have these questions:

  • Is handle.spawn() the right way to cause that command to run asynchronously? Or does the act of creating it do that implicitly? I don’t think so, because omitting that line and then running the program doesn’t cause the command to run.

  • If you do spawn it, how do you get access to the future when it’s complete? I tried tacking on an and_then() with a closure after the output_async(), but several variations of that didn’t compile. I don’t know if that’s the right construct to use, here.

  • And finally, back to the original question, how do you launch the next future (start the Timeout in this case), when the command’s future has arrived and, after the Timeout future has arrives, launch the final future, which is the running of the second command.

Any advice on the techniques needed to structure the code to do this is appreciated.

Chuck


#2

Did you ever end up looking at that socks5 proxy example from your other thread? I glanced at it, and it seemed like a pretty good example of non-trivial usage (albeit it was all in one module but whatever, it’s an example).

As for your compilation error with spawn, it’s because spawn requires the Future arg to have no output and no error (i.e. Future<Item = (), Error = ()). You’re trying to pass it a future that doesn’t match that (it has an Item and Error that are not ()). This is because spawn doesn’t return anything, and so the future signature conveys that the future is responsible for dealing with its own result and any errors that may pop up.

Based on the tokio-process docs, it seems like output_async should run the process on the event loop. Are you sure you’re not dropping the returned OutputAsync? That will kill the child.

As for your and_then troubles, can you paste what didn’t compile? That should be the way to chain a future to successful result of another future. You mentioned you tried to use a closure, but and_then needs a future. If you want to map the result of a future with a closure, use map - this will run your closure on the return value of the future, and return that mapping operation as its own future, which is something you can then chain with and_then.


#3

I did look at the socks5-proxy proxy. Even though it’s small, there was too much going there for me to obtain anything useful. I’m trying to tackle the concepts in really small bites with my own project, but so far even this is rough sledding.

I now understand what you mean with respect to OutputAsync not being compatible with spawn(). By studying two things–the function signature for tokio_core::reactor::Handle's spawn() and tokio_process::OutputAsync's implementation of the Future trait–one can see that the type parameters for Item and Error in the latter indeed do not match the former.

As far as actually running the process, the docs do seem to imply that the process should get spawned when you call output_async(). My command was a shell script that included a long sleep so that I had time to check for it with ps. I didn’t see it running. Something else subtle is going on here. I wonder if something else needs to be done. The last line of the program does contain a drop, for the core, when it runs. It looks like this:

drop(core.run(b_stream));

b_stream is the tokio UdpSocket created at the beginning of the program.

The and_then stuff: I tried a bunch of things and didn’t save the errors for all of them. Here is a simple one though:

    let output = Command::new("echo").arg("hello").arg(">>").arg("/Users/chuck/src/knockd/test.out")
        .output_async(&handle).and_then(|_| {
            println!("done with command");
            future::ok(())
        });

This yields:

error[E0271]: type mismatch resolving `<futures::AndThen<tokio_process::OutputAsync, futures::FutureResult<(), std::io::Error>, [closure@src/main.rs:79:45: 82:14]> as futures::Future>::Error == ()`
  --> src/main.rs:89:16
   |
89 |         handle.spawn(output);
   |                ^^^^^ expected struct `std::io::Error`, found ()
   |
   = note: expected type `std::io::Error`
   = note:    found type `()`

It might be that map needs to be combined with and_then somehow, but my mental model is getting kind of hazy.

Chuck


#4

Can you show your full code where you’re trying to spawn the process in response to a udp packet?

As for and_then, you’re passing it a regular closure but this function needs a Future. So it’s meant to chain Future instances. Instead, you should use map here - that will run your closure when the output_async future completes.


#5

Sure. I got a simplified form of it working (spawning the command, waiting a bit, printing "done) by using the then() combinator. If I understand this correctly, the key to this working in its current form is that the final future (the future that ends immediately and ends the sequence) is a future::ok(()) which is is compatible with spawn().

let mut core = Core::new().unwrap();
let handle = core.handle();
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let sock = UdpSocket::bind(&addr, &handle).unwrap();
let (_, incoming) = sock.framed(LineCodec).split();

let incoming = incoming.for_each(|(addr, req)| {
    println!("{} seconds {} for {} (from {})", duration, access_cmd, &req.addr, addr);

    let hc = handle.clone();
    let output = Command::new(access_cmd)
        .arg("start").arg(format!("{}", &req.addr))
        .output_async(&handle)
        .then(move |_| {
            println!("start");
            Timeout::new(Duration::from_secs(5), &hc).unwrap()
        })
     /*
        .then(move  |_| {
            println!("delayed");
            Command::new("/Users/chuck/src/knockd/test.sh")
                .arg("end").arg(format!("whomp"))
                .output_async(&hc)
        })
     */
        .then(|_| {
            println!("end");
            future::ok(())
        });

    handle.spawn(output);
    future::ok(())
});

You’ll notice that there is a commented out third future. Trying to get that into the mix ran afoul of the borrow checker. Narrowing the problem down to a single item: how can I make sure the handle is available to for passing to output_async()? Notice that I already cloned it once. What’s the technique for passing something like this along, or ensuring that it’s a reference to something that outlives everything. Is this a case where things get annotated with a lifetime?

Simply uncommenting the block above gives:

error[E0382]: capture of moved value: `hc`
  --> src/main.rs:87:36
   |
79 |             .then(move |_| {
   |                   -------- value moved (into closure) here
...
87 |                     .output_async(&hc)
   |                                    ^^ value captured here after move
   |
   = note: move occurs because `hc` has type `tokio_core::reactor::Handle`, which does not implement the `Copy` trait

#6

Handle can be cloned as many times as you want, so use that to your advantage. Make a clone if needed, and store it by value in the closure environment (use move if needed).

There are very few situations where you are able to store a non-'static object inside the closure environment of a Future.


#7

As @Fylwind mentioned, you should clone the Handle and move that clone into a closure. In your case, your first closure ends up moving the handle, thus making it unavailable in your second (commented out) closure.


#8

Right, cloning things was what i stumbled on. I needed references to the same thing in a couple of different places, so I cloned enough copies and used “move” to transfer ownership. Specifically, two copies of handle (hc and hc2), an extra copy of the command string (access_cmd cloned as revoke_cmd), and an extra copy of req.addr as req_addr. Is this the technique? It makes sense, but seems strange.

This is what it looks like now:

    let hc = handle.clone();
    let hc2 = handle.clone();
    let revoke_cmd = format!("{}", access_cmd);
    let req_addr = req.addr.to_string();
    
    let grant_then_revoke = Command::new(access_cmd).arg("start").arg(format!("{}", &req.addr))
        .output_async(&handle)
        .then(move |_| {
            Timeout::new(Duration::from_secs(duration), &hc).unwrap()
        })
        .then(move |_| {
            Command::new(revoke_cmd).arg("stop").arg(req_addr)
                .output_async(&hc2)
        })
        .then(|_| {
            future::ok(())
        });

I think the next refinement is replacing then with and_then in some places because that enables the sequence to stop if the future arrives and the result is a failure. I’ll try to make that work and see what I come up with.


#9

You can probably avoid some of the cloning by returning the state (e.g. req) from the futures in the chain - “pass the puck”, so to speak. Your closures receive arguments, which you currently ignore (via the placeholder ‘_’) but if you make a closure return the state it received, it’ll be available to the next future, and so on. This way you’re moving state in and out of the closures/futures in your chain.


#10

Passing the puck is definitely what I’m after. I’d started to use the closure’s argument in a couple cases. so, for example, I could print the standard output of the command after it executes. As you’ve pointed out, there’s a further use of the closure argument: to pass along state as the computation proceeds.

Using map() seemed like the way to do this: creating a future that carries the state along. This one example eliminates the need for the revoke_cmd variable in the earlier example:

.and_then(move |o| {
            print!("{}", str::from_utf8(&o.stdout).unwrap());
            Timeout::new(Duration::from_secs(duration), &hc).unwrap()
        }).map(|_| { req })
        .then(move |req| {
            Command::new(revoke_cmd).arg("stop").arg(req.unwrap().addr.to_string())
                .output_async(&hc2)
        })

The map() creates a Future that resolves to an instance of my state struct, which is passed to the closure as req. It arrives as a Result (so must be unwrapped) and is then available for use.

I’ll see if I can refine this further and eliminate other clones, which will further clean things up.


#11

I tried to transfer the state data through my futures instead of creating a bunch of clones, but got lost in all the attempts at making that work.

At a high level, the problems seem to be:

  • Some of the data are in the scope of main(), which may not outlive the closure functions. I don’t understand how this is possible, but assuming it is, I don’t know how to say: it’s ok it reference this essentially global data.

  • using a value more than once is tricky. The use of move presents a conundrum: it moves all the variables in a scope into the closure, making them inaccessible to subsequent closures. You might even “inadvertently” move things that you’d want to move later. But if you don’t use move, then there are lifetime-related problems detailed below.

In this program there are 4 futures, an initial one and 3 more chained to the first. Future 1 has access to the data it needs and is not a problem. Futures 2 and 3 are connected with the then or and_then combinators. The closures therein are where the problems lie. The 4th closure needs no data other than the result of the previous future. That’s passed to the closure, so it is not a problem.

This is the data:

  1. access_cmd: A String in scope of main(). Needed twice (1st and 3rd future). Trying to access it in a closure is forbidden because it borrows the value from the main() scope and that may not live long enough. The specific error is borrowed value must be valid for the static lifetime...

  2. o: An Output passed as a closure. needed twice (2nd and 4th futures). This one poses no ownership problem because the combinator passes the value as a closure argument.

  3. duration: A u64, in the scope of main(). Needed once (2nd future). Because it’s used only once, the closure that uses it can simply be a move closure and the value will be consumed. This solution is sufficient because duration is needed nowhere else.

  4. handle: A Handle in the scope of main(). Needed three times (1st, 2nd and 3rd futures). Trying to access it in the closures of the 2nd and 3rd is troublesome. The closure may outlive the current function (main()). Converting the closure to a move closure results in this error message: cannot move out of captured outer variable in an FnMut closure, which I don’t understand. Finally creating a single clone and attempting to use it in two different closures doesn’t work because the second closure attempts to capture the value after it moved into the first closure. The solution here was to create two clones of handle, named hc and hc2 and use them in the two closures than needed it. This seems naive.

  5. req_addr: a String in the scope of the for_each loop. Needed twice (1st and 3rd futures). Using it in the 3rd future’s closure results in a similar problem as handle. req_addr is inadvertently made inaccessible by an earlier closure that moved the data, so isn’t accessible.

So, a host of problems that all seem like they need different solutions. I’m not sure if each one should be solved individually or if there’s a good technique to apply to all of them.


#12

Is it possible to make sure the closures don’t outlive main, such as joining the threads, using scoped threads or by other means?


#13

I’m not familiar with any of those things, so not sure. I went pretty far into the weeds here, so in order to not get further into “XY problem” territory, here’s a restatement:

My tokio-core based application has data that is needed at different stages of a sequence of chained-together futures. Some of it is data from the program’s main function, other data is more local: information synthesized from parameters passed to a closure inside a for_each. The program works now by making various clones of data. But it seems like all that copying is not necessary and that a better understanding of ownership would result in less clutter. I’d like some advice on how to clean things up.


#14

cannot move out of captured outer variable in an FnMut closure

An FnMut closure is a closure that is anticipated to be called multiple times (usually as part of some iteration). Captured variables within the FnMut closure are necessary to call the FnMut object. If you move one of them out to somewhere else, then the FnMut can no longer be called on the next iteration!


#15

OK, well this is the specific code. This code is inside the main function (within a loop). handle is in the scope of main() and is the variable that presumably is the problem.

        .and_then(move |o| {
            print!("{}", str::from_utf8(&o.stdout).unwrap());
            Timeout::new(Duration::from_secs(duration), &handle).unwrap()
        })

This is the cannot move out of captured outer variable in an FnMut closure error

Can you explain what “move out of captured outer variable” means? The message just doesn’t make sense to me.


#17

My guess (based on your earlier post) is that you have something like this:

let handle = /* ... */;
incoming.for_each(move |(addr, req)| {
    let duration = something;
    blahblah.and_then(move |o| {
        print!("{}", str::from_utf8(&o.stdout).unwrap());
        Timeout::new(Duration::from_secs(duration), &handle).unwrap()
    })
})

What the compiler sees is more like (pseudocode):

let handle = /* ... */;
incoming.for_each(__Closure1 { __h1: handle })

struct __Closure1 { __h1: Handle }
impl FnMut for __Closure1 {
    fn call_mut(&mut self, (addr, req)) {
        let duration = something;
        blahblah.and_then(__Closure2 { __h2: self.__h1, __d1: duration })
                 // can't move self.__h1!   ^^^^^^^^^
                 // because self is &mut!
    }
}

struct __Closure2 { __h2: Handle, __d1: Duration }
impl FnOnce for __Closure2 {
    fn call_once(self, o) {
        print!("{}", str::from_utf8(&o.stdout).unwrap());
        Timeout::new(Duration::from_secs(self.__d1), &self.__h2).unwrap()
    }
}

The handle gets moved into the environment (self) of __Closure1. Inside __Closure1, you are asking for the handle to be moved again into __Closure2, but because __Closure1 is FnMut (i.e. it could be called multiple times), you only get &mut access to the environment, so you aren’t allowed to move things out of the environment as that would invalidate the environment for future invocations. Conceptually it’s the same reason you aren’t allowed to write code such as:

let handle: Handle = /* ... */;
loop {
    use_handle(handle); // oops, handle is gone now
                        // what will happen on the next iteration!?
}

The trick is to make a clone of handle within __Closure1 (which I think you’ve already discovered). This clone will be called on each invocation of __Closure1, bypassing the problem.


#18

This would be possible if you capture a reference to data from main(), and the capturing closure is moved to another thread. The current thread may exit before that other thread runs, and therefore the locals you’ve captured a reference to will be gone by then. That’s the gist, and is why you’ll see some functions requiring that a closure passed to them has 'static lifetime bounds.

With Core::run, however, the closure you pass doesn’t need to 'static since that call doesn’t return until the future is resolved. But, you may be using other APIs that require 'static.

By the way, it would be easier if you showed the entire code when talking about a snippet - it makes it easier to see what you’re working with, rather than needing to assume and/or scroll up and down this thread :slight_smile:


#19

There was some subtlety here I was missing out on: The “body” of the for_each() isn’t just a simple loop. It’s a closure that’s doing some captures and moves of its own. Given what you’ve described, the error message makes more sense. To recap:

Implementors of the FnMut trait borrow a mutable reference to the scope’s “environment”, via the &mut self parameter. The individual variables are “captures”, available via self. But attempting to move one of those fields (the 2nd closure’s capture) is a no go. My understanding here is a little fuzzier, but it sounds like the move would transfer ownership of part of the environment elsewhere. Once the “receiving function” finished, the moved value would go out of scope and be removed, essentially making that part of the environment invalid…


#20

Right, sorry about that. I’ll put the code next to the questions.


#21

@Fylwind did a nice job desugaring the closures. To recap, here’s his desugaring:

let handle = /* ... */;
incoming.for_each(__Closure1 { __h1: handle })

struct __Closure1 { __h1: Handle }
impl FnMut for __Closure1 {
    fn call_mut(&mut self, (addr, req)) {
        let duration = something;
        blahblah.and_then(__Closure2 { __h2: self.__h1, __d1: duration })
                 // can't move self.__h1!   ^^^^^^^^^
                 // because self is &mut!
    }
}

struct __Closure2 { __h2: Handle, __d1: Duration }
impl FnOnce for __Closure2 {
    fn call_once(self, o) {
        print!("{}", str::from_utf8(&o.stdout).unwrap());
        Timeout::new(Duration::from_secs(self.__d1), &self.__h2).unwrap()
    }
}

You have nested closures (a closure that creates another closure internally). The outer closure is the FnMut one because it’s invoked multiple times, and therefore needs to stay valid across those multiple invocations. For each invocation of the outer closure, a new inner one is created.

Note how the inner closure takes a Handle by value - that will move it into this inner closure from wherever it’s coming from. In this case, that Handle is coming from the outer closure via self, where self refers to the outer closure:

blahblah.and_then(__Closure2 { __h2: self.__h1, __d1: duration })

If this were allowed, then the outer closure would no longer be valid since its __h1 value is moved into the inner closure, and we couldn’t call the outer closure more than once.