I'm writing a terminal application that listens to stdin and signals asynchronously using tokio. However, I'm observing a hang when dropping the signal handler with the following code. If you run this example, you'll be greeted with a blank screen. Pressing o
will cause "hello" to be echoed to the screen. Ctrl+C
will terminate the event loop and end the program (your terminal will probably be corrupted too, just run reset
to fix it). However, if you uncomment the FIXME
in main
, Ctrl+C
will still end the stream, but the event loop will continue to run forever and the program does not exit. I would expect the behavior to stay the same.
extern crate bytes;
extern crate futures;
extern crate libc;
extern crate termion;
extern crate tokio;
extern crate tokio_fs;
extern crate tokio_io;
extern crate tokio_process;
extern crate tokio_signal;
use std::io::{self, Write};
use std::process::{Command, Stdio};
use bytes::BytesMut;
use futures::sync::mpsc;
use termion::raw::{RawTerminal, IntoRawMode};
use termion::screen::AlternateScreen;
use tokio::prelude::*;
use tokio_io::codec::{Decoder, FramedRead};
use tokio_process::CommandExt;
use tokio_signal::unix::Signal;
pub struct AsyncStdout(AlternateScreen<RawTerminal<tokio_fs::Stdout>>);
impl Write for AsyncStdout {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
impl AsyncWrite for AsyncStdout {
fn shutdown(&mut self) -> io::Result<Async<()>> {
self.0.shutdown()
}
}
struct ByteCodec;
impl Decoder for ByteCodec {
type Item = u8;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if buf.is_empty() {
Ok(None)
} else {
// This is inefficient, but it probably doesn't matter since this is interactive input.
Ok(Some(buf.split_to(1)[0]))
}
}
}
enum Event {
Key(u8),
Signal(libc::c_int),
}
fn main() {
let done = futures::lazy(|| {
let stdin = FramedRead::new(tokio_fs::stdin(), ByteCodec).map(Event::Key);
let signals = Signal::new(libc::SIGWINCH)
.flatten_stream()
.map(Event::Signal);
let mut stdout = AsyncStdout(AlternateScreen::from(tokio_fs::stdout().into_raw_mode().unwrap()));
stdout.flush().unwrap();
// FIXME: uncomment to observe hang
// let stdin = stdin.select(signals);
Ok((stdin, stdout))
}).and_then(|(events, _stdout)| {
std::mem::forget(_stdout);
let mut process = Command::new("cat")
.stdin(Stdio::piped())
.spawn_async()
.unwrap();
let stdin = process.stdin().take().unwrap();
let stdin = tokio_io::codec::FramedWrite::new(stdin, tokio_io::codec::BytesCodec::new());
let (core_stdin_tx, core_stdin_rx) = mpsc::unbounded();
tokio::spawn(core_stdin_rx.forward(stdin.sink_map_err(|e| panic!("{}", e))).map(|_| ()));
tokio::spawn(process.map(|_| ()).map_err(|e| panic!("{}", e)));
events.map_err(|e| panic!("{}", e)).for_each(move |event| {
match event {
Event::Key(3) => {
// Terminate event loop
return Err(());
}
Event::Key(b'o') => {
core_stdin_tx.unbounded_send(String::from("hello").as_bytes().into()).unwrap();
}
_ => (),
}
Ok(())
})
});
tokio::run(done);
}
It's a fair amount of code to go through, but I'm having trouble reducing the code any further while still reproducing the issue. For example, the following code works as expected. Any lines entered on stdin will be echoed back, and any signals will be printed as well. Entering exit
will finish the event stream, and the event loop will complete.
extern crate futures;
extern crate libc;
extern crate tokio;
extern crate tokio_fs;
extern crate tokio_io;
extern crate tokio_signal;
use futures::{Future, Stream};
use tokio_io::codec::{FramedRead, LinesCodec};
use tokio_signal::unix::{Signal, SIGINT};
#[derive(Debug, PartialEq, Eq)]
enum Event {
Line(String),
Signal(libc::c_int),
}
fn main() {
let done = futures::lazy(|| {
let stdin = FramedRead::new(tokio_fs::stdin(), LinesCodec::new()).map(|l| Event::Line(l));
let signals = Signal::new(SIGINT)
.flatten_stream()
.map(|s| Event::Signal(s));
Ok(stdin.select(signals))
}).and_then(|events| {
events
.map_err(|e| panic!("{}", e))
.for_each(|e| {
if e == Event::Line("exit".into()) {
return Err(());
}
println!("{:?}", e);
Ok(())
})
});
tokio::run(done);
}