Properly Draining Tokio MPMC Channels

I'm using nightly and Tokio 0.2, writing a process manager.

The current challenge that I'm trying to solve is unserialized writes to stdout and sderr by child processes. Presently, their standard output and error are just attached to the parent's output and error, so when child processes write output, they can write over each other and the parent process logging is also not serialized.

The current plan is to dedicate one task per child process, reading lines from its standard output and error and forwarding them to a channel to be written serially. I'm implementing a log4rs appender as well which uses this channel so that all output is serialized.

use std::error::Error;

use tokio::sync::mpsc::channel;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // create channels for asynchronous logging
    let (sender, mut receiver) = channel::<String>(1024);

    // read all log entries from the channel and write them out
    tokio::spawn(async move {
        while let Some(entry) = receiver.recv().await {
            println!("{}", entry.trim_end());
        }

        receiver.close()
    });

    // initialize logging
    logging::init(sender.clone());

    // send some log events
    log::trace!("TRACED");
    log::debug!("DEBUGGED");
    log::info!("BUT MAYBE INFO");
    log::warn!("DANGERESQUE");
    log::error!("SUPER BAD");

    Ok(())
}

mod logging {
    use log::{LevelFilter, Record};

    use log4rs::append::Append;
    use log4rs::config::{Appender, Config, Logger, Root};
    use log4rs::encode::pattern::PatternEncoder;
    use log4rs::encode::Encode;

    use std::error::Error;

    use log4rs::encode::writer::simple::SimpleWriter;
    use tokio::sync::mpsc::Sender;

    static PATTERN: &'static str = "{d(%Y-%m-%dT%H:%M:%S%.3f)} [{level:5.5}] [{file}:{line}]: {message}{n}";

    pub fn init(output: Sender<String>) {
        let encoder: Box<dyn Encode> = Box::new(PatternEncoder::new(&PATTERN));
        let appender: Box<dyn Append> = Box::new(ChannelAppender { output, encoder });

        log4rs::init_config(
            Config::builder()
                .appender(Appender::builder().build("stderr", appender))
                .logger(Logger::builder().build(env!("CARGO_PKG_NAME"), LevelFilter::Trace))
                .build(Root::builder().appender("stderr").build(LevelFilter::Warn))
                .unwrap(),
        )
        .unwrap();
    }

    #[derive(Debug)]
    struct ChannelAppender {
        output: Sender<String>,
        encoder: Box<dyn Encode>,
    }

    impl Append for ChannelAppender {
        fn append(&self, record: &Record) -> Result<(), Box<dyn Error + Sync + Send>> {
            let mut writer = SimpleWriter(Vec::new());
            self.encoder.encode(&mut writer, record)?;

            let mut output = self.output.clone();

            let _handle = tokio::spawn(async move {
                output.send(String::from_utf8_lossy(writer.0.as_slice()).to_string()).await.unwrap();
            });

            Ok(())
        }

        fn flush(&self) {}
    }
}

This (somewhat obviously) has a race condition. The write to the channel is buffered and not synchronous, so these messages are sent but not usually received before the program exits.

I'm assuming that I need to define the Appender's flush(&self) method to take some action to wait until the channel is drained.

When I attempt to self.output.clone().close(), nothing happens. When I add a thread::sleep, I see what I expect. Unfortunately, this does indicate a race condition. The receiver channel isn't able to close itself before program exit.

How do I close the sender properly in this case, ensuring that the receiver is drained when flush is called?

The spawn function returns a future. Store it in a variable and await it at the end of main, such that main waits for it. Make sure to drop all senders before awaiting, as it otherwise will wait forever.

Hmm. Is there a good design pattern for this? I feel like this is probably a common use-case. It might not be straightforward to prevent Sender from being cloned, and I think that the actual appender is stored with a static lifetime due to internal workings of log.

I would say that this is literally a design pattern (albeit a nameless one):

  • implement workers as channel.into_iter().for_each(...), such that a worker stops naturally when the channel is closed
  • keep track of the senders and make sure they are properly dropped (yes, this needs some deliberate design, but you get clean shutdown in the return)
  • join all the workers in the end, to make sure all messages are processed.

I happen to have a blog post on the topic: https://matklad.github.io/2019/08/23/join-your-threads.html. It talks about synchronous API, but the async world should work roughly the same (though you might need to explicitly join the task, as drop can't be async)

Thank you! I'll have to try this out.

I'm going to probably also do something where I wrap my Sender in an Arc<Lock<T>> and create some Handle type that I keep around in main that, when dropped, closes the Sender. I might have to do some homework to ensure that the Sender gets dropped before the Receiver, though, right?

I'll look through the documentation and try to determine what Drop does on each value, to figure out:

  • what happens when Sender is dropped
  • what happens when Receiver is dropped

Hopefully the answer is that the Receiver consumes everything left in the channel and then returns.

The memory won't necessarily be deallocated until both are dropped, but you won't be able to send more items when you drop the receiver.

Yeah. My main goal is that when main returns, all messages are consumed by the Receiver synchronously so that I don't lose log messages. Maybe I need to close the Sender first and then somehow await the receiver's termination, which should happen when the sender has been closed and all messages have been consumed.

:thinking: