I'm implementing a small server based on Tokio's UDP streams that performs an action asynchronously. I'm having trouble adding additional actions. This seems to be an application of chaining futures together, which I haven't yet grasped.
Currently, the code receives an object from the stream and does this:
- Prints the object out.
- Delay for a bit (but go back to listening for incoming packets, so that's that async part)
- When the delay is over, print the object out again.
The code is this:
let b_stream = b_stream.for_each(|(addr, req)| {
println!("{} start {} (from {})", access_cmd, &req.addr, addr);
let revoke_cmd = format!("{} stop {}", access_cmd, req.addr);
let timeout = Timeout::new(Duration::from_secs(duration), &handle).unwrap();
let cb = timeout.map(move |_| {
println!("{}", revoke_cmd)
}).map_err(|err| println!("{:?}", err));
handle.spawn(cb);
future::ok(())
});
Question #1: this code works, but is it idiomatic and use correct technique?
Question #2: how would I extend this to be:
- Invoke a child process asynchronously.
- Once that process is complete, log its output and start the
Timeout
. - When the
Timeout
is done invoke another process asynchronously. - When that second process is done, log its output.
In sum, the code inside the loop, would manage the creation of three futures in sequence (the first command, the Timeout
, the second command), not just one (the Timeout). The tokio-process
crate has the "run a process asynchronously" functionality, so I tried this out as an initial stab:
let output = Command::new(access_cmd).arg("stop").arg(format!("{}", &req.addr))
.output_async(&handle);
handle.spawn(output);
But I got these errors:
error[E0271]: type mismatch resolving `<tokio_process::OutputAsync as futures::Future>::Item == ()`
--> src/main.rs:76:16
|
76 | handle.spawn(output);
| ^^^^^ expected struct `std::process::Output`, found ()
|
= note: expected type `std::process::Output`
= note: found type `()`
error[E0271]: type mismatch resolving `<tokio_process::OutputAsync as futures::Future>::Error == ()`
--> src/main.rs:76:16
|
76 | handle.spawn(output);
| ^^^^^ expected struct `std::io::Error`, found ()
|
= note: expected type `std::io::Error`
= note: found type `()`
Not a very auspicious start. I'm aware that some structs define types, but I don't know how that comes into play here. The spawn()
takes a future, and the output_async()
function returns an OutputAsync
, which implements the Future
trait. I'm missing something here. Beyond the error message, I have these questions:
-
Is
handle.spawn()
the right way to cause that command to run asynchronously? Or does the act of creating it do that implicitly? I don't think so, because omitting that line and then running the program doesn't cause the command to run. -
If you do spawn it, how do you get access to the future when it's complete? I tried tacking on an
and_then()
with a closure after theoutput_async()
, but several variations of that didn't compile. I don't know if that's the right construct to use, here. -
And finally, back to the original question, how do you launch the next future (start the
Timeout
in this case), when the command's future has arrived and, after theTimeout
future has arrives, launch the final future, which is the running of the second command.
Any advice on the techniques needed to structure the code to do this is appreciated.
Chuck