I'm using nightly and Tokio 0.2, writing a process manager.
The current challenge that I'm trying to solve is unserialized writes to stdout and sderr by child processes. Presently, their standard output and error are just attached to the parent's output and error, so when child processes write output, they can write over each other and the parent process logging is also not serialized.
The current plan is to dedicate one task per child process, reading lines from its standard output and error and forwarding them to a channel to be written serially. I'm implementing a log4rs appender as well which uses this channel so that all output is serialized.
use std::error::Error;
use tokio::sync::mpsc::channel;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// create channels for asynchronous logging
let (sender, mut receiver) = channel::<String>(1024);
// read all log entries from the channel and write them out
tokio::spawn(async move {
while let Some(entry) = receiver.recv().await {
println!("{}", entry.trim_end());
}
receiver.close()
});
// initialize logging
logging::init(sender.clone());
// send some log events
log::trace!("TRACED");
log::debug!("DEBUGGED");
log::info!("BUT MAYBE INFO");
log::warn!("DANGERESQUE");
log::error!("SUPER BAD");
Ok(())
}
mod logging {
use log::{LevelFilter, Record};
use log4rs::append::Append;
use log4rs::config::{Appender, Config, Logger, Root};
use log4rs::encode::pattern::PatternEncoder;
use log4rs::encode::Encode;
use std::error::Error;
use log4rs::encode::writer::simple::SimpleWriter;
use tokio::sync::mpsc::Sender;
static PATTERN: &'static str = "{d(%Y-%m-%dT%H:%M:%S%.3f)} [{level:5.5}] [{file}:{line}]: {message}{n}";
pub fn init(output: Sender<String>) {
let encoder: Box<dyn Encode> = Box::new(PatternEncoder::new(&PATTERN));
let appender: Box<dyn Append> = Box::new(ChannelAppender { output, encoder });
log4rs::init_config(
Config::builder()
.appender(Appender::builder().build("stderr", appender))
.logger(Logger::builder().build(env!("CARGO_PKG_NAME"), LevelFilter::Trace))
.build(Root::builder().appender("stderr").build(LevelFilter::Warn))
.unwrap(),
)
.unwrap();
}
#[derive(Debug)]
struct ChannelAppender {
output: Sender<String>,
encoder: Box<dyn Encode>,
}
impl Append for ChannelAppender {
fn append(&self, record: &Record) -> Result<(), Box<dyn Error + Sync + Send>> {
let mut writer = SimpleWriter(Vec::new());
self.encoder.encode(&mut writer, record)?;
let mut output = self.output.clone();
let _handle = tokio::spawn(async move {
output.send(String::from_utf8_lossy(writer.0.as_slice()).to_string()).await.unwrap();
});
Ok(())
}
fn flush(&self) {}
}
}
This (somewhat obviously) has a race condition. The write to the channel is buffered and not synchronous, so these messages are sent but not usually received before the program exits.
I'm assuming that I need to define the Appender
's flush(&self)
method to take some action to wait until the channel is drained.
When I attempt to self.output.clone().close()
, nothing happens. When I add a thread::sleep
, I see what I expect. Unfortunately, this does indicate a race condition. The receiver channel isn't able to close itself before program exit.
How do I close the sender properly in this case, ensuring that the receiver is drained when flush is called?