[tokio] Event loop hang when dropping tokio signal handler


#1

I’m writing a terminal application that listens to stdin and signals asynchronously using tokio. However, I’m observing a hang when dropping the signal handler with the following code. If you run this example, you’ll be greeted with a blank screen. Pressing o will cause “hello” to be echoed to the screen. Ctrl+C will terminate the event loop and end the program (your terminal will probably be corrupted too, just run reset to fix it). However, if you uncomment the FIXME in main, Ctrl+C will still end the stream, but the event loop will continue to run forever and the program does not exit. I would expect the behavior to stay the same.

extern crate bytes;
extern crate futures;
extern crate libc;
extern crate termion;
extern crate tokio;
extern crate tokio_fs;
extern crate tokio_io;
extern crate tokio_process;
extern crate tokio_signal;

use std::io::{self, Write};
use std::process::{Command, Stdio};

use bytes::BytesMut;
use futures::sync::mpsc;
use termion::raw::{RawTerminal, IntoRawMode};
use termion::screen::AlternateScreen;
use tokio::prelude::*;
use tokio_io::codec::{Decoder, FramedRead};
use tokio_process::CommandExt;
use tokio_signal::unix::Signal;

pub struct AsyncStdout(AlternateScreen<RawTerminal<tokio_fs::Stdout>>);

impl Write for AsyncStdout {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.0.write(buf)
    }

    fn flush(&mut self) -> io::Result<()> {
        self.0.flush()
    }
}

impl AsyncWrite for AsyncStdout {
    fn shutdown(&mut self) -> io::Result<Async<()>> {
        self.0.shutdown()
    }
}

struct ByteCodec;

impl Decoder for ByteCodec {
    type Item = u8;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if buf.is_empty() {
            Ok(None)
        } else {
            // This is inefficient, but it probably doesn't matter since this is interactive input.
            Ok(Some(buf.split_to(1)[0]))
        }
    }
}

enum Event {
    Key(u8),
    Signal(libc::c_int),
}

fn main() {
    let done = futures::lazy(|| {
        let stdin = FramedRead::new(tokio_fs::stdin(), ByteCodec).map(Event::Key);
        let signals = Signal::new(libc::SIGWINCH)
            .flatten_stream()
            .map(Event::Signal);

        let mut stdout = AsyncStdout(AlternateScreen::from(tokio_fs::stdout().into_raw_mode().unwrap()));
        stdout.flush().unwrap();

        // FIXME: uncomment to observe hang
        // let stdin = stdin.select(signals);

        Ok((stdin, stdout))
    }).and_then(|(events, _stdout)| {
        std::mem::forget(_stdout);

        let mut process = Command::new("cat")
            .stdin(Stdio::piped())
            .spawn_async()
            .unwrap();

        let stdin = process.stdin().take().unwrap();
        let stdin = tokio_io::codec::FramedWrite::new(stdin, tokio_io::codec::BytesCodec::new());
        let (core_stdin_tx, core_stdin_rx) = mpsc::unbounded();

        tokio::spawn(core_stdin_rx.forward(stdin.sink_map_err(|e| panic!("{}", e))).map(|_| ()));
        tokio::spawn(process.map(|_| ()).map_err(|e| panic!("{}", e)));

        events.map_err(|e| panic!("{}", e)).for_each(move |event| {
            match event {
                Event::Key(3) => {
                    // Terminate event loop
                    return Err(());
                }
                Event::Key(b'o') => {
                    core_stdin_tx.unbounded_send(String::from("hello").as_bytes().into()).unwrap();
                }
                _ => (),
            }
            Ok(())
        })
    });

    tokio::run(done);
}

It’s a fair amount of code to go through, but I’m having trouble reducing the code any further while still reproducing the issue. For example, the following code works as expected. Any lines entered on stdin will be echoed back, and any signals will be printed as well. Entering exit will finish the event stream, and the event loop will complete.

extern crate futures;
extern crate libc;
extern crate tokio;
extern crate tokio_fs;
extern crate tokio_io;
extern crate tokio_signal;

use futures::{Future, Stream};
use tokio_io::codec::{FramedRead, LinesCodec};
use tokio_signal::unix::{Signal, SIGINT};

#[derive(Debug, PartialEq, Eq)]
enum Event {
    Line(String),
    Signal(libc::c_int),
}

fn main() {
    let done = futures::lazy(|| {
        let stdin = FramedRead::new(tokio_fs::stdin(), LinesCodec::new()).map(|l| Event::Line(l));
        let signals = Signal::new(SIGINT)
            .flatten_stream()
            .map(|s| Event::Signal(s));
        Ok(stdin.select(signals))
    }).and_then(|events| {
        events
        .map_err(|e| panic!("{}", e))
        .for_each(|e| {
            if e == Event::Line("exit".into()) {
                return Err(());
            }

            println!("{:?}", e);

            Ok(())
        })
    });

    tokio::run(done);
}

#2

I think the cat process is still hanging out. Note that tokio::run(my_fut) does not shut down the runtime when my_fut completes but there are background tasks (i.e. tokio::spawn()'d ones) still active.

So, you probably want to do something like the following:

let stdin = process.stdin().take().unwrap();
let stdin = tokio_io::codec::FramedWrite::new(stdin, tokio_io::codec::BytesCodec::new());
let (core_stdin_tx, core_stdin_rx) = mpsc::unbounded();

tokio::spawn(core_stdin_rx.forward(stdin.sink_map_err(|e| panic!("{}", e))).map(|_| ()));

// Create an explicit oneshot channel to reap the `cat` process
let (stop_tx, stop_rx) = futures::oneshot();
tokio::spawn(process.select2(stop_rx).map(|_| ()).map_err(|_| ()););

 events.map_err(|e| panic!("{}", e)).for_each(move |event| {
       match event {
            Event::Key(3) => {
                // Terminate event loop
                return Err(());
            }
            Event::Key(b'o') => {
                core_stdin_tx.unbounded_send(String::from("hello").as_bytes().into()).unwrap();
            }
            _ => (),
        }.map_err(|e| {
            // Signal completion to reap the `cat` proc
            stop_tx.send(()).unwrap();
            e
        })
    Ok(())
 })

#3

Unfortunately I don’t think that I can use the oneshot for my use case. The actual process I’m using for this project, xi-core, does some cleanup once stdin is closed, so I can’t just reap it. I’m depending on the fact that dropping the handle to the child’s stdin should also terminate the process. Furthermore, I don’t think that the child hanging is the issue, since the only difference between the working and not-working code is joining a signal handler for the parent to the parent’s stdin.


#4

Why do you think dropping the handle to stdin will terminate the process? The Child handle is owned by the future you spawned - if the child does not exit, neither will the loop.

I tried this locally, and I’m able to repro your scenario with the code as-is but if I add the oneshot channel, it exits reliably.


#5

xi-core, like cat, will run indefinitely until stdin is closed. The docs for std::process::ChildStdin indicate that the file handle to stdin is closed on drop, and I believe tokio-process is intended to work the same way.

And yeah, the oneshot will work to terminate the process, but I’m not sure that’s a clean exit?


#6

What I observe (on linux) is ctrl-C does shut down stdin, but cat becomes a zombie (i.e. defunct) right after because it exits but parent proc didn’t wait() on it. This presumably prevents Child from completing, and the loop hangs around.

So probably what you want then is:

tokio::spawn(core_stdin_rx.forward(stdin.sink_map_err(|e| panic!("{}", e))).then(|_| {
            process.map(|_| ()).map_err(|e| panic!("{}", e))
        }));

Once stdin is shut down, we wait on process by map'ing it. This replaces the separate tokio::spawn(process.map(|_| ()).map_err(|e| panic!("{}", e))); future.

This seems to work.


#7

Aha! Thank you for the help!


#8

Ok, coming back to this, I’m not sure that it’s completely solved. Notice that in the “working” example, the signals aren’t actually reported. If I instead use signals.select(stdin), then the signals are reported, but exiting on entering “exit” doesn’t work anymore.

I’m seeing this behavior in the more complicated example as well.


#9

Which is the “working” example? Can you paste the code you’re having trouble with?


#10

Sorry, I meant the second example in the original post.


#11

Hmm, that example works for me:

That works with either Ok(stdin.select(signals)) or Ok(signals.select(stdin)) being returned, the former being what you wrote in your first post.

Can you elaborate a bit more on what you’re seeing with it?


#12

With that sequence of events, I get the same output for both stdin.select(signals) and signals.select(stdin).

hello
Line("hello")
there
Line("there")
^C                 (I need to press enter here...)
Line("")
Signal(2)
exit

Strange that I have to press enter to get the Ctrl-C to register. I notice that you’re on Linux, and I’m on a Mac. I wonder if that has something to do with it?


#13

Oh yeah, I had to press Enter as well. I wonder if tokio_fs has something to do with it because just running a Signal stream future works as expected.