I'm trying to figure out how to send tracing crate events to a Tokio mpsc channel. In other words, instead of printing them in a terminal/writing to a file, I'd like to send them (formatted or unformatted) to a channel so I can then process them async later (in the example, I'd just print them again from the channel).
So far, I found this example that implements make_my_great_writer(), but I'm not sure how to repurpose it to actually do something with the event.
Any help would be much appreciated ;-). Thank you.
use tokio::time::{sleep, Duration};
use tracing::{debug, info, warn, Level};
// use tracing_subscriber::FmtSubscriber; // trait
use tokio::io::AsyncWriteExt;
#[tokio::main(flavor = "multi_thread")]
async fn main() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
let handle = tokio::task::spawn(async move {
let mut stdout = tokio::io::stdout();
while let Some(out) = rx.recv().await {
stdout.write_all(&out).await.unwrap();
stdout.flush().await.unwrap();
}
});
tx.send("test".as_bytes().to_vec()).await.unwrap();
// let subscriber = FmtSubscriber::builder().with_max_level(Level::TRACE).finish();
// let subscriber = tracing_subscriber::fmt().with_writer(std::io::stdout).with_max_level(Level::TRACE).finish();
// https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/trait.MakeWriter.html
fn make_my_great_writer() -> impl std::io::Write {
std::io::stderr()
}
let subscriber = tracing_subscriber::fmt().with_writer(make_my_great_writer).finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
info!("testing");
warn!("one");
debug!("two");
info!("three");
}
This is trickier than it sounds. It touches on an arguable flaw of async Rust: although "everyone knows" that println! and tracing::info! etc. can block, and that you shouldn't call blocking functions in async code, we all often call those functions in async code anyway. (Try logging into a UNIX pipe with tracing_subscriber and you'll have problems ...) You are using a bounded channel, so it can fill up and sending can block. You need to use an unbounded channel or drop log messages when the channel is full. There's already a crate that does this (tracing_appender). Once you choose how to resolve the blocking issue you need to create a type that wraps the channel and implements the Write trait.
You can have the MakeWriter return a struct that writes into the channel:
use std::io::{self, Write};
use tokio::sync::mpsc;
use tracing_subscriber::MakeWriter;
struct ChannelMakeWriter {
chan: mpsc::Sender<Vec<u8>>,
}
impl<'a> MakeWriter<'a> for ChannelMakeWriter {
type Writer = ChannelWriter;
fn make_writer(&'a self) -> Self::Writer {
ChannelWriter {
sender: self.chan.clone(),
buffer: Vec::new(),
}
}
}
struct ChannelWriter {
sender: mpsc::Sender<Vec<u8>>,
buffer: Vec<u8>,
}
impl Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer.extend(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
// This function is called once the Subscriber is done with the writer
impl Drop for ChannelWriter {
fn drop(&mut self) {
let data = std::mem::take(&mut self.buffer);
// We can't do anything if the channel is closed
// This will only block if there isn't capacity,
// so use an unbounded channel
let _ = self.sender.blocking_send(data);
}
}
Then set the writer on the SubscriberBuilder and it should work.
tracing-appender is probably not necessary, because writing to the ChannelWriter won't block for any I/O.
Thank you very much @cod10129 ! It's almost there - I just need to figure out how to fix the missing Fn trait implement.
use std::io::{self, Write};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use tracing::{debug, info, warn, Level};
use tracing_subscriber::fmt::MakeWriter;
struct ChannelMakeWriter {
chan: mpsc::Sender<Vec<u8>>,
}
impl<'a> MakeWriter<'a> for ChannelMakeWriter {
type Writer = ChannelWriter;
fn make_writer(&'a self) -> Self::Writer {
ChannelWriter {
sender: self.chan.clone(),
buffer: Vec::new(),
}
}
}
struct ChannelWriter {
sender: mpsc::Sender<Vec<u8>>,
buffer: Vec<u8>,
}
impl Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer.extend(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
// This function is called once the Subscriber is done with the writer
impl Drop for ChannelWriter {
fn drop(&mut self) {
let data = std::mem::take(&mut self.buffer);
// We can't do anything if the channel is closed
// This will only block if there isn't capacity,
// so use an unbounded channel
let _ = self.sender.blocking_send(data);
}
}
#[tokio::main(flavor = "multi_thread")]
async fn main() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
let handle = tokio::task::spawn(async move {
let mut stdout = tokio::io::stdout();
while let Some(out) = rx.recv().await {
stdout.write_all(&out).await.unwrap();
stdout.flush().await.unwrap();
}
});
tx.send("test".as_bytes().to_vec()).await.unwrap();
let make_writer = ChannelMakeWriter { chan: tx };
let writer = ChannelMakeWriter::make_writer(&make_writer);
// let subscriber = tracing_subscriber::FmtSubscriber::builder().with_writer(writer).with_max_level(Level::TRACE).finish();
// let subscriber = tracing_subscriber::fmt().with_writer(writer).with_max_level(Level::TRACE).finish();
// the trait `Fn()` is not implemented for `ChannelWriter`, which is required by `for<'writer> ChannelWriter: MakeWriter<'writer>`
// where
// W2: for<'writer> MakeWriter<'writer> + 'static,
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `SubscriberBuilder::<N, E, F, W>::with_writer`
let subscriber = tracing_subscriber::fmt().with_writer(writer).finish();
// the trait `Fn()` is not implemented for `ChannelWriter`, which is required by `for<'writer> ChannelWriter: MakeWriter<'writer>`
// let subscriber = tracing_subscriber::fmt::SubscriberBuilder::default().with_max_level(tracing::Level::DEBUG).with_writer(writer).with_line_number(true).init();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
info!("testing");
warn!("one");
debug!("two");
info!("three");
}
the Fn family of traits can only be directly used in nightly with #![feature(fn_traits)], it is NOT implementable in stable rust.
the error message:
"Fn is not implemented for ..."
is misleading, what you are doing wrong is, you are supposed to pass the MakeWriter to SubscriberBuilder::with_writer(), NOT the MakeWriter::Writer.
the error message really should be:
for <'w> MakeWriter<'w> is not implemented for ...
but, because there's a convenient blanket implementation of MakeWriter, so that you can directly use closures(or functions) as argument to SubscriberBuilder::with_writer(). unfortunately, this confused the compiler.
the blanket implementation looks like:
impl<'a, F, W> MakeWriter<'a> for F where F: Fn() -> W, W: Write {
...
}