Auto clone a clonable object when used in a new thread


#1

the problem comes from the log crate, when I implement my log rule, something is difficult to handle. The impl’s purpose is simple, log thread does not block and write to file, another thread handles the file writing.

pub type MsgType = (LogLevel, String, String);
pub type LogSender = Arc<Mutex<Sender<MsgType>>>;

pub struct FileLogger {
    sender: LogSender,
    max_level: LogLevel,
}

impl FileLogger {

    pub fn new(sender: LogSender, level:LogLevel) -> Self {
        FileLogger {
            sender: sender,
            max_level: level,
        }
    }
}

impl log::Log for FileLogger {

    fn enabled(&self, metadata: &LogMetadata) -> bool {
        metadata.level() <= self.max_level
    }

    fn log(&self, record: &LogRecord) {
        let mt = record.metadata();
        if self.enabled(mt) {
            let head = format!("[{}]", mt.target());
            let arc_sender = self.sender.clone();
            let sender_lock = arc_sender.try_lock();
            if let Ok(sender) = sender_lock {
                let rst = sender.send((record.level(), head, record.args().to_string()));
                if let Err(e) = rst {
                    println!("{:?}", e);
                }
            } else {
                let dt = Local::now();
                let colored = format!("[{}] [{}] {}", dt.format("%Y-%m-%d %H:%M:%S%.9f"), record.level(), head);
                println!("{} {}", colored, record.args().to_string());
            }
        }
    }
}

    let (sx, rx) = mpsc::channel::<MsgType>();
    let sx = Arc::new(Mutex::new(sx));
    let _ = log::set_logger(|max_log_level| {
        max_log_level.set(LogLevelFilter::Trace);
        Box::new(FileLogger::new(sx, level))
    }).unwrap();
    let writer = FileLoggerWriter::new(rx);
    let _ = start_log_spawn(writer);

The FileLogger should be sync, but the sender is clonable, and I have no chance to clone the sender. Then the Arc<Mutex<>> is used, very bad. What should I do?


#2

Try using a (single thread) threadpool, like rayon, instead of mpsc channels.


#3

Maybe I didn’t descrip the problem very clear. In short, I want something like thread_local, but the thread_local copys the object, I need something thread_local, but clones the object.I have read the rayon, found it was difficult to use, and I think it’s important to have something thread_local clone the object. It’s an important language feature, like c or c++, it’s so easy to handle this situation.


#4

How about this?

use std::sync::Arc;
use std::cell::RefCell;

#[derive(Clone)]
struct MyType;

fn thread(global: Arc<MyType>) {
    thread_local! {
        static CACHED: RefCell<Option<MyType>> = RefCell::new(None);
    }
    CACHED.with(|cached| {
        let mut borrowed = cached.borrow_mut();
        let local: &mut MyType = borrowed.get_or_insert_with(|| (*global).clone());
        
        // use `local` here
    })
}

I need something called "lazy_clone", what should I do?
#5

Your code is right, but can’t not solve the problem, I wrote an example.
Yours

use std::thread;
use std::sync::Arc;
use std::cell::RefCell;

fn main()
{
    let global = Arc::new(MyType);

    let g1 = global.clone();
    let handle1 = thread::spawn(move || {
         let _ = thread(g1);
    });

    let g2 = global.clone();
    let handle2 = thread::spawn(move || {
         let _ = thread(g2);
    });

    handle1.join().unwrap();
    handle2.join().unwrap();
}

#[derive(Clone)]
struct MyType;

fn thread(global: Arc<MyType>) {
    //do something here
}

The problem is

use std::thread;
use std::sync::{Mutex, Arc};
use std::cell::RefCell;

#[macro_use]
extern crate lazy_static;

lazy_static! {
    static ref GLOBAL:Arc<MyType> = Arc::new(MyType);
    //static ref GLOBAL:Arc<Mutex<MyType>> = Arc::new(Mutex::new(MyType));
}

fn main()
{
    let handle1 = thread::spawn(move || {
         let _ = thread(); //if use Mutex, locks one time
         let _ = thread(); //if use Mutex, locks one time
         let _ = thread(); //if use Mutex, locks one time
         let _ = thread(); //if use Mutex, locks one time
         let _ = thread(); //if use Mutex, locks one time
         let _ = thread(); //if use Mutex, locks one time
    });

    let handle2 = thread::spawn(move || {
         let _ = thread(); //if use Mutex, locks one time
         let _ = thread(); //if use Mutex, locks one time
         let _ = thread(); //if use Mutex, locks one time
         let _ = thread(); //if use Mutex, locks one time
    });

    handle1.join().unwrap();
    handle2.join().unwrap();
}

#[derive(Clone)]
struct MyType;

//the log api is like this, you have no chance to clone the global before sub thread starts
fn thread() {
    //how to use GLOBAL here?
    //I want something like this
    thread_local_var! { //the macro is virtual, need impl
        let global_op:Option<Arc<MyType>> = None;
    }

    //only clone once per thread(the mutex way will lock for many times.)
    if global_op.is_none() {
        thead_local_clone! { //the macro is virtual, need impl
            global_op = GLOBAL.clone(); //how to impl this? this wants the lazy clone, clone from sub_thread, not the main thread.
        }
    }

    if let Some(global) = global_op {
        //use global here
    }

    //mutex will be like this, it's easy but not efficient, one thread lock many times。
    //let gobal = GLOBAL.lock().unwrap(); //then i can use gobal
}

I want a most efficient way to handle this situation, Not only an available one.


#6

So you want a thread-local Sender and a Receiver tied to a background thread?

One option might be to combine a lazy_static Mutex<(Sender<...>, Option<Receiver<...>>)> with a thread_local that contains a cloned Sender. The Receiver is take()n out from the Option during initialization and moved to the background thread. Something like:

#[macro_use]
extern crate lazy_static;

use std::sync::mpsc::*;
use std::sync::Mutex;

lazy_static! {
    static ref G: Mutex<(Sender<i32>, Option<Receiver<i32>>)> = {
        let (tx, rx) = channel();
        Mutex::new((tx, Some(rx)))
    };
}

fn main() {
    let rx = G.lock().unwrap().1.take().unwrap();
    // move rx to background thread
}

fn log() {
    thread_local! {
        static S: Sender<i32> = G.lock().unwrap().0.clone();
    }
    S.with(|sender| sender.send(5));
}

#7

@limitee Out of curiosity, what do you expect to happen when the message queue on the writer thread backs up? I suspect that if you are currently I/O bound with inline writes, you will continue to be I/O bound with a dedicated writer thread. Until you either 1) run out of memory and OoM kill, or 2) hit a queue size maximum and go back to the same old blocking behavior anyway.


#8

I don’t want to speak for @limitee but my own thoughts on async log writing/flushing:

  1. If you write in the foreground, you can hit a momentary stall even if the write is going to the kernel’s page cache. You may not submit another log for a while, yet the system’s I/O is momentarily oversubscribed (perhaps a noisy neighbor). That would be annoying to accept.
  2. There might be additional enrichment of the message or processing (e.g. statistic/metric accumulation), and if that can be done in the background, that’s better.
  3. Using a channel allows you to deal with backpressure as you see fit. The unbounded channel is not the way to go there, but putting a bounded one would allow you to, e.g., drop the message altogether or take some other action, perhaps still blocking if need be. With foreground file writes, you have no such luxury.

#9

Yeah, I understand the tradeoffs. I ask because I have direct experience with these things in large production systems. And it’s always better to allow the infrastructure to handle logging instead of “contaminating” the application. That’s why it’s codified as one of the 12 factors.

Container filesystems, for example, may all be backed by tmpfs, so you don’t even necessarily see I/O wait during flush calls. And FWIW, I/O wait is itself a form of back pressure, even if it’s not generally configurable.


#10

Yeah, that would be great if the infrastructure can shield the app from latent stalls.

tmpfs would be a strange place to log if you want persistence across reboots/crashes, unless the data is mirrored/forwarded off tmpfs to physical storage by something else.

It’s not even the flush that may stall, but a plain write that is not permitted to dirty a new page (if it’s unlucky like that).

Yeah, but it allows the app no control - you get trapped in the stall.


#11

tmpfs would be a strange place to log if you want persistence across reboots/crashes, unless the data is mirrored/forwarded off tmpfs to physical storage by something else.

We’re really off-topic here (sorry about that!) but I guess it would be best to mention that the 12 factor app guidelines anticipate that the infrastructure will also be configured to ship logs to permanent storage. Again, it’s not an application concern for how or where logs get stored, but a concern that your operations team (which may be you wearing a different hat!) manages. Some tools that are specifically called out are Logplex and Fluentd for shipping, and Splunk and Hadoop/Hive for indexing and storage, respectively.

Yeah, but it allows the app no control - you get trapped in the stall.

The stipulation with “let the infrastructure manage it” also implies that the app doesn’t need such controls. The infrastructure does. The basis for the 12-factor apps guidelines is one of separation of concerns. It’s the same reason we typically don’t write our own virtual memory systems, or implement thread context switching in each application. None of these things are important to the application’s business logic. Neither is logging. :slight_smile:


Anyway, back on topic. I could be wrong, but it sounds like @limitee wants to avoid the mutex entirely? Your solution is effectively what I arrived at as well. The lock for Sender is only acquired when the thread_local is instantiated. I also wanted to try a lock-free data structure (like crossbeam) but you still need to use lazy_static to make the channel globally accessible, and thread_local to clone and cache…


#12

I have a lot more to say about logging but, indeed, we’re hijacking this thread :slight_smile:

I think @limitee wants to avoid a lock acquisition per send() operation, which I completely sympathize with; a one time (per thread) acquisition to get a sender shouldn’t be a big deal at all.