I'm playing around with Tokio 0.2 with nightly to get async/await support. So far, I've been able to get things working pretty well. I'm now running into an issue with using multiple different streams and trying to merge them together.
Cargo.toml
[package]
name = "pman"
version = "0.1.0"
authors = ["Naftuli Kay <me@naftuli.wtf>"]
edition = "2018"
[dependencies]
futures-util-preview = "0.3.0-alpha.18"
tokio = "0.2.0-alpha.2"
tokio-net = { version = "0.2.0-alpha.2", features = ["signal"] }
main.rs
use futures_util::{future, FutureExt};
use futures_util::stream::SelectAll;
use futures_util::stream::StreamExt;
use std::process;
use tokio_net::signal::unix::{Signal, SignalKind};
use tokio::stream::Stream;
#[derive(Debug)]
pub enum Signals {
Hangup,
User1,
User2,
Terminate,
Quit,
Interrupt,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("BEHOLD, I AM {}", process::id());
let sighup = Signal::new(SignalKind::hangup()).unwrap();
let mut stream = SelectAll::new();
stream.push(Signal::new(SignalKind::hangup()).unwrap().map(|_| Signals::Hangup));
stream.for_each(|s| {
match s {
Signals::Interrupt | Signals::Quit | Signals::Terminate => {
println!("AND WE'RE OUT: {:?}", s);
process::exit(0);
},
_ => {
println!("Received other signal: {:?}", s);
}
}
future::ready(())
}).await;
Ok(())
}
The above example compiles just fine.
However, what I'm trying to do is merge n streams together so that I can have a single signal handler location to dispatch events from.
If I attempt to add the following line:
stream.push(Signal::new(SignalKind::interrupt()).unwrap().map(|_| Signals::Interrupt));
I now get a compilation error:
error[E0308]: mismatched types
--> src/main.rs:29:67
|
29 | stream.push(Signal::new(SignalKind::interrupt()).unwrap().map(|_| Signals::Interrupt));
| ^^^^^^^^^^^^^^^^^^^^^^ expected closure, found a different closure
|
= note: expected type `[closure@src/main.rs:28:64: 28:83]`
found type `[closure@src/main.rs:29:67: 29:89]`
= note: no two closures, even if identical, have the same type
= help: consider boxing your closure and/or using it as a trait object
I'm trying to understand what this means.
The issue is that each stream only returns ()
so I'm attempting to map
these to an enum variant and then select_all
over all of them, but the compiler isn't happy with these closures.
Which type do I need here? I'm assuming I just need a Stream<Signals>
but I'm new to async code in Rust. I conceptually understand but I don't understand from the type system's perspective.