Sending `tracing` events to MPSC Channel

Hi everybody,

I'm trying to figure out how to send tracing crate events to a Tokio mpsc channel. In other words, instead of printing them in a terminal/writing to a file, I'd like to send them (formatted or unformatted) to a channel so I can then process them async later (in the example, I'd just print them again from the channel).

So far, I found this example that implements make_my_great_writer(), but I'm not sure how to repurpose it to actually do something with the event.

Any help would be much appreciated ;-). Thank you.

use tokio::time::{sleep, Duration};
use tracing::{debug, info, warn, Level};
// use tracing_subscriber::FmtSubscriber; // trait
use tokio::io::AsyncWriteExt;

#[tokio::main(flavor = "multi_thread")]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(100);

    let handle = tokio::task::spawn(async move {
        let mut stdout = tokio::io::stdout();

        while let Some(out) = rx.recv().await {
            stdout.write_all(&out).await.unwrap();
            stdout.flush().await.unwrap();
        }
    });

    tx.send("test".as_bytes().to_vec()).await.unwrap();


    // let subscriber = FmtSubscriber::builder().with_max_level(Level::TRACE).finish();
    // let subscriber = tracing_subscriber::fmt().with_writer(std::io::stdout).with_max_level(Level::TRACE).finish();

    // https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/trait.MakeWriter.html
    fn make_my_great_writer() -> impl std::io::Write {
        std::io::stderr()
    }
    let subscriber = tracing_subscriber::fmt().with_writer(make_my_great_writer).finish();

    tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

    info!("testing");
    warn!("one");
    debug!("two");
    info!("three");
}
1 Like

This is trickier than it sounds. It touches on an arguable flaw of async Rust: although "everyone knows" that println! and tracing::info! etc. can block, and that you shouldn't call blocking functions in async code, we all often call those functions in async code anyway. (Try logging into a UNIX pipe with tracing_subscriber and you'll have problems ...) You are using a bounded channel, so it can fill up and sending can block. You need to use an unbounded channel or drop log messages when the channel is full. There's already a crate that does this (tracing_appender). Once you choose how to resolve the blocking issue you need to create a type that wraps the channel and implements the Write trait.

2 Likes

Thank you @user16251 , I switched to tracing_appender.

But I still can't figure out how to make tracing_appender to send to channel instead of std::io::stderr.

use tracing::{debug, info, warn, Level};
use tokio::io::AsyncWriteExt;

#[tokio::main(flavor = "multi_thread")]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(100);

    let handle = tokio::task::spawn(async move {
        let mut stdout = tokio::io::stdout();

        while let Some(out) = rx.recv().await {
            stdout.write_all(&out).await.unwrap();
            stdout.flush().await.unwrap();
        }
    });

    tx.send("test\n\n".as_bytes().to_vec()).await.unwrap();

    let (writer, guard) = tracing_appender::non_blocking(std::io::stderr());

    let subscriber = tracing_subscriber::fmt().with_writer(writer).with_max_level(Level::TRACE).finish();

    tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

    info!("testing");
    warn!("one");
    debug!("two");
    info!("three");
}

You can have the MakeWriter return a struct that writes into the channel:

use std::io::{self, Write};
use tokio::sync::mpsc;
use tracing_subscriber::MakeWriter;

struct ChannelMakeWriter {
    chan: mpsc::Sender<Vec<u8>>,
}

impl<'a> MakeWriter<'a> for ChannelMakeWriter {
    type Writer = ChannelWriter;

    fn make_writer(&'a self) -> Self::Writer {
        ChannelWriter {
            sender: self.chan.clone(),
            buffer: Vec::new(),
        }
    }
}

struct ChannelWriter {
    sender: mpsc::Sender<Vec<u8>>,
    buffer: Vec<u8>,
}

impl Write for ChannelWriter {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.buffer.extend(buf);
        Ok(buf.len())
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

// This function is called once the Subscriber is done with the writer
impl Drop for ChannelWriter {
    fn drop(&mut self) {
        let data = std::mem::take(&mut self.buffer);
        // We can't do anything if the channel is closed
        // This will only block if there isn't capacity,
        // so use an unbounded channel
        let _ = self.sender.blocking_send(data);
    }
}

Then set the writer on the SubscriberBuilder and it should work.

tracing-appender is probably not necessary, because writing to the ChannelWriter won't block for any I/O.

1 Like

Thank you very much @cod10129 ! It's almost there - I just need to figure out how to fix the missing Fn trait implement.

use std::io::{self, Write};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use tracing::{debug, info, warn, Level};
use tracing_subscriber::fmt::MakeWriter;

struct ChannelMakeWriter {
    chan: mpsc::Sender<Vec<u8>>,
}

impl<'a> MakeWriter<'a> for ChannelMakeWriter {
    type Writer = ChannelWriter;

    fn make_writer(&'a self) -> Self::Writer {
        ChannelWriter {
            sender: self.chan.clone(),
            buffer: Vec::new(),
        }
    }
}

struct ChannelWriter {
    sender: mpsc::Sender<Vec<u8>>,
    buffer: Vec<u8>,
}

impl Write for ChannelWriter {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.buffer.extend(buf);
        Ok(buf.len())
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

// This function is called once the Subscriber is done with the writer
impl Drop for ChannelWriter {
    fn drop(&mut self) {
        let data = std::mem::take(&mut self.buffer);
        // We can't do anything if the channel is closed
        // This will only block if there isn't capacity,
        // so use an unbounded channel
        let _ = self.sender.blocking_send(data);
    }
}

#[tokio::main(flavor = "multi_thread")]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(100);

    let handle = tokio::task::spawn(async move {
        let mut stdout = tokio::io::stdout();

        while let Some(out) = rx.recv().await {
            stdout.write_all(&out).await.unwrap();
            stdout.flush().await.unwrap();
        }
    });

    tx.send("test".as_bytes().to_vec()).await.unwrap();

    let make_writer = ChannelMakeWriter { chan: tx };

    let writer = ChannelMakeWriter::make_writer(&make_writer);

    // let subscriber = tracing_subscriber::FmtSubscriber::builder().with_writer(writer).with_max_level(Level::TRACE).finish();
    // let subscriber = tracing_subscriber::fmt().with_writer(writer).with_max_level(Level::TRACE).finish();

    // the trait `Fn()` is not implemented for `ChannelWriter`, which is required by `for<'writer> ChannelWriter: MakeWriter<'writer>`
    //   where
    //   W2: for<'writer> MakeWriter<'writer> + 'static,
    //       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `SubscriberBuilder::<N, E, F, W>::with_writer`
    let subscriber = tracing_subscriber::fmt().with_writer(writer).finish();

    // the trait `Fn()` is not implemented for `ChannelWriter`, which is required by `for<'writer> ChannelWriter: MakeWriter<'writer>`
    // let subscriber = tracing_subscriber::fmt::SubscriberBuilder::default().with_max_level(tracing::Level::DEBUG).with_writer(writer).with_line_number(true).init();

    tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

    info!("testing");
    warn!("one");
    debug!("two");
    info!("three");
}

the Fn family of traits can only be directly used in nightly with #![feature(fn_traits)], it is NOT implementable in stable rust.

the error message:

"Fn is not implemented for ..."

is misleading, what you are doing wrong is, you are supposed to pass the MakeWriter to SubscriberBuilder::with_writer(), NOT the MakeWriter::Writer.


the error message really should be:

for <'w> MakeWriter<'w> is not implemented for ...

but, because there's a convenient blanket implementation of MakeWriter, so that you can directly use closures(or functions) as argument to SubscriberBuilder::with_writer(). unfortunately, this confused the compiler.

the blanket implementation looks like:

impl<'a, F, W> MakeWriter<'a> for F where F: Fn() -> W, W: Write { 
...
}

You dont need to make ChannelMakeWriter, just use ChannelWriter directly.

struct ChannelWriter {
    sender: mpsc::Sender<Vec<u8>>,
    buffer: Vec<u8>,
}

impl std::io::Write for ChannelWriter {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        self.buffer.extend(buf);
        Ok(buf.len())
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}

impl Drop for ChannelWriter {
    fn drop(&mut self) {
        let data = std::mem::take(&mut self.buffer);
        let _ = self.sender.blocking_send(data);
    }
}
#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
    let handle = tokio::spawn(async move {
        let mut stdout = tokio::io::stdout();
        while let Some(out) = rx.recv().await {
            stdout.write_all(b"I AM FROM CHANNEL").await.unwrap();
            stdout.write_all(&out).await.unwrap();
            stdout.flush().await.unwrap();
        }
    });

    tx.send("HI".as_bytes().to_vec()).await.unwrap();
    let (nb, _guard) = tracing_appender::non_blocking(ChannelWriter {
        sender: tx,
        buffer: vec![],
    });
    let subscriber = Registry::default()
        .with(fmt::Layer::default().with_writer(nb))
        .with(fmt::Layer::default().with_writer(std::io::stdout));

    tracing::subscriber::set_global_default(subscriber).expect("unable to set global subscriber");
    tracing::info!("Hello, world!");
    tracing::warn!("WARNING HELLO");
}
2 Likes

Thank you very much @PolynomialParody !

I think there is an issue somewhere. If I uncomment the second layer, I don't get any output. It doesn't seem to be sending any messages.

use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::fmt;
use tracing_subscriber::Registry;

struct ChannelWriter {
    sender: mpsc::Sender<Vec<u8>>,
    buffer: Vec<u8>,
}

impl std::io::Write for ChannelWriter {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        self.buffer.extend(buf);
        Ok(buf.len())
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}

impl Drop for ChannelWriter {
    fn drop(&mut self) {
        let data = std::mem::take(&mut self.buffer);
        let _ = self.sender.blocking_send(data);
    }
}
#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
    let handle = tokio::spawn(async move {
        let mut stdout = tokio::io::stdout();

        while let Some(out) = rx.recv().await {
            // stdout.write_all(b"I AM FROM CHANNEL").await.unwrap();
            stdout.write_all(dbg!(&out)).await.unwrap();
            stdout.flush().await.unwrap();
        }
    });

    tx.send("HI\n\n".as_bytes().to_vec()).await.unwrap();
    let (nb, _guard) = tracing_appender::non_blocking(ChannelWriter {
        sender: tx,
        buffer: vec![],
    });
    let subscriber = Registry::default()
        .with(fmt::Layer::default().with_writer(nb));
        // .with(fmt::Layer::default().with_writer(std::io::stdout))

    tracing::subscriber::set_global_default(subscriber).expect("unable to set global subscriber");
    tracing::info!("Hello, world!");
    tracing::warn!("WARNING HELLO");

    sleep(Duration::from_secs(5)).await;
}
1 Like