Merging Disparate Streams using SelectAll

I'm playing around with Tokio 0.2 with nightly to get async/await support. So far, I've been able to get things working pretty well. I'm now running into an issue with using multiple different streams and trying to merge them together.

Cargo.toml

[package]
name = "pman"
version = "0.1.0"
authors = ["Naftuli Kay <me@naftuli.wtf>"]
edition = "2018"

[dependencies]
futures-util-preview = "0.3.0-alpha.18"
tokio = "0.2.0-alpha.2"
tokio-net = { version = "0.2.0-alpha.2", features = ["signal"] }

main.rs

use futures_util::{future, FutureExt};
use futures_util::stream::SelectAll;
use futures_util::stream::StreamExt;

use std::process;

use tokio_net::signal::unix::{Signal, SignalKind};
use tokio::stream::Stream;

#[derive(Debug)]
pub enum Signals {
    Hangup,
    User1,
    User2,
    Terminate,
    Quit,
    Interrupt,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("BEHOLD, I AM {}", process::id());

    let sighup = Signal::new(SignalKind::hangup()).unwrap();

    let mut stream = SelectAll::new();

    stream.push(Signal::new(SignalKind::hangup()).unwrap().map(|_| Signals::Hangup));

    stream.for_each(|s| {
        match s {
            Signals::Interrupt | Signals::Quit | Signals::Terminate => {
                println!("AND WE'RE OUT: {:?}", s);
                process::exit(0);
            },
            _ => {
                println!("Received other signal: {:?}", s);
            }
        }

        future::ready(())
    }).await;

    Ok(())
}

The above example compiles just fine.

However, what I'm trying to do is merge n streams together so that I can have a single signal handler location to dispatch events from.

If I attempt to add the following line:

stream.push(Signal::new(SignalKind::interrupt()).unwrap().map(|_| Signals::Interrupt));

I now get a compilation error:

error[E0308]: mismatched types
  --> src/main.rs:29:67
   |
29 |     stream.push(Signal::new(SignalKind::interrupt()).unwrap().map(|_| Signals::Interrupt));
   |                                                                   ^^^^^^^^^^^^^^^^^^^^^^ expected closure, found a different closure
   |
   = note: expected type `[closure@src/main.rs:28:64: 28:83]`
              found type `[closure@src/main.rs:29:67: 29:89]`
   = note: no two closures, even if identical, have the same type
   = help: consider boxing your closure and/or using it as a trait object

I'm trying to understand what this means.

The issue is that each stream only returns () so I'm attempting to map these to an enum variant and then select_all over all of them, but the compiler isn't happy with these closures.

Which type do I need here? I'm assuming I just need a Stream<Signals> but I'm new to async code in Rust. I conceptually understand but I don't understand from the type system's perspective.

SelectAll is parametrized with the stream type S, which must be uniform for all contained streams, while the map(|_| ...) call used in stream.push(...) produces a different type for each invocation, because the contained closure is of the different unnameable type each time. The error you get says as much, indicating the difference between closure types.

What you're trying to do is achievable with an enum and a bit of boilerplate. Warning: perhaps there's a better way, I'm still finding my bearings in the brave new world of Pins and Contexts. Also, I couldn't compile the original code for lack of Signal::new()—my version of tokio-net contains a bare fn signal(). Adjust accordingly.

use std::pin::Pin;
use std::task::{Context, Poll};

enum SigWrap {
    Hangup(Signal),
    Interrupt(Signal),
}

impl Stream for SigWrap {
    type Item = Signals;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match &mut *self {
            SigWrap::Hangup(ref mut s) => match Pin::new(s).poll_next(cx) {
                Poll::Ready(Some(_)) => Poll::Ready(Some(Signals::Hangup)),
                Poll::Ready(None) => Poll::Ready(None),
                Poll::Pending => Poll::Pending,
            },
            SigWrap::Interrupt(ref mut s) => match Pin::new(s).poll_next(cx) {
                Poll::Ready(Some(_)) => Poll::Ready(Some(Signals::Interrupt)),
                Poll::Ready(None) => Poll::Ready(None),
                Poll::Pending => Poll::Pending,
            },
        }
    }
}

...

    stream.push(SigWrap::Hangup(signal(SignalKind::hangup()).unwrap()));
    stream.push(SigWrap::Interrupt(signal(SignalKind::interrupt()).unwrap()));

1 Like

I was able to get something working via a recommendation by talchas on the Rust IRC by using boxed() on each Map instance.

Working source code here:

use futures_util::future;
use futures_util::stream::SelectAll;
use futures_util::stream::StreamExt;

use std::process;

use std::pin::Pin;
use tokio::stream::Stream;
use tokio_net::signal::unix::{Signal, SignalKind};

#[derive(Debug)]
pub enum Signals {
    Hangup,
    User1,
    User2,
    Terminate,
    Quit,
    Interrupt,
    Child,
}

pub type SignalStream = Pin<Box<dyn Stream<Item = Signals> + Send>>;

impl Signals {
    fn as_signal(&self) -> Signal {
        match self {
            Signals::Hangup => Signal::new(SignalKind::hangup()).unwrap(),
            Signals::User1 => Signal::new(SignalKind::user_defined1()).unwrap(),
            Signals::User2 => Signal::new(SignalKind::user_defined2()).unwrap(),
            Signals::Terminate => Signal::new(SignalKind::terminate()).unwrap(),
            Signals::Quit => Signal::new(SignalKind::quit()).unwrap(),
            Signals::Interrupt => Signal::new(SignalKind::interrupt()).unwrap(),
            Signals::Child => Signal::new(SignalKind::child()).unwrap(),
        }
    }

    /// Convert the given signal into a stream.
    pub fn stream(&self) -> SignalStream {
        match self {
            Signals::Hangup => self.as_signal().map(|_| Signals::Hangup).boxed(),
            Signals::User1 => self.as_signal().map(|_| Signals::User1).boxed(),
            Signals::User2 => self.as_signal().map(|_| Signals::User2).boxed(),
            Signals::Terminate => self.as_signal().map(|_| Signals::Terminate).boxed(),
            Signals::Quit => self.as_signal().map(|_| Signals::Quit).boxed(),
            Signals::Interrupt => self.as_signal().map(|_| Signals::Interrupt).boxed(),
            Signals::Child => self.as_signal().map(|_| Signals::Child).boxed(),
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("BEHOLD, I AM {}", process::id());

    let mut signals = SelectAll::new();

    [
        Signals::Hangup,
        Signals::User1,
        Signals::User2,
        Signals::Terminate,
        Signals::Quit,
        Signals::Interrupt,
        Signals::Child,
    ]
    .iter()
    .map(|s| s.stream())
    .for_each(|s| signals.push(s));

    signals
        .for_each(|s| {
            match s {
                Signals::Interrupt | Signals::Quit | Signals::Terminate => {
                    println!("AND WE'RE OUT: {:?}", s);
                    process::exit(0);
                }
                _ => {
                    println!("Received other signal: {:?}", s);
                }
            }

            future::ready(())
        })
        .await;

    Ok(())
}

This can certainly be improved usability-wise, but it does work.