How to send a Writer into a thread?


#1

I currently have a function that takes a Writer and writes into it. I need to change it to make the write async. Here’s a simplified version of my attempt:

link: https://play.rust-lang.org/?gist=e13bddc3904468d2de1d&version=stable

code:

use std::io::Write;
use std::thread;

fn main() {
    let mut buffer = Vec::new();
    write_to(&mut buffer);
}

fn write_to<W: Write>(mut buffer: W) {
    thread::spawn(move||{
        buffer.write(b"hello").unwrap();
    });
    
}

This, of course, doesn’t compile because Write cannot be shared between threads. My main question is, why? And how to solve it?

The write_to function is supposed to be exposed to users of my library. They call it passing a writer but never actually read from it. It could be a file or stdout. For now I wouldn’t like to use channels for this, unless it is only internally.

This is a continuation from my previous question here: How to test output to stdout


#2

What needs to be sent (in this case, not shared) between threads is not Write (that’s only a trait), but W. You can give a W: Write + Send + 'static bound. (The 'static bound is required because it makes sure that W doesn’t contain anything that might not live as long as the thread.)

You’ll still have a problem because you’re passing a mutable reference to write_to; this will run into a lifetime error because the thread can outlive main(). Passing write_to(buffer) will work.


#3
<anon>:10:5: 10:18 error: the trait `core::marker::Send` is not implemented for the type `W` [E0277]
<anon>:10     thread::spawn(move||{
              ^~~~~~~~~~~~~
fn write_to<W: Write + Send>(mut buffer: W) {
    ...
}
<anon>:10:5: 10:18 error: the parameter type `W` may not live long enough [E0310]
<anon>:10     thread::spawn(move||{
              ^~~~~~~~~~~~~
<anon>:10:5: 10:18 help: see the detailed explanation for E0310
<anon>:10:5: 10:18 help: consider adding an explicit lifetime bound `W: 'static`...
fn write_to<W: Write + Send + 'static>(mut buffer: W) {
    ...
}
<anon>:6:19: 6:25 error: `buffer` does not live long enough
<anon>:6     write_to(&mut buffer);
                           ^~~~~~
note: reference must be valid for the static lifetime...
fn main() {
    let mut buffer = Vec::new();
    write_to(buffer);
}
<anon>:5:9: 5:19 warning: variable does not need to be mutable, #[warn(unused_mut)] on by default
<anon>:5     let mut buffer = Vec::new();
                 ^~~~~~~~~~
fn main() {
    let buffer = Vec::new();
    write_to(buffer);
}

Which only really makes sense if you’re writing to a file, as otherwise the changes just disappear. If you want to write to something that you then read back from, well, that’s a different kettle of fish; you’ll need to either have the thread pass ownership of the W back (which means blocking, which means it’d be easier to not bother with a thread in the first place or just use a channel and return that), or writing an implementation of Write for something like Arc<Mutex<W>> which is pretty straightforward.


#4

How does that even work, if Vec doesn’t implement Send?


#5

Well, it does. Send is one of the weird traits (known as OIBITs; don’t worry about what it means, the name is deeply misleading and will probably change) that is automatically implemented on everything, except for types where it is specifically un-implemented, or types containing other types for which it is un-implemented.

Practical upshot: Vec is Send, although there’s no direct way to know that other than just trying it and seeing if it works. It makes sense because Vec owns its data, and isn’t tied to a particular thread, so there’s no reason you shouldn’t be able to send them between threads.


#6

Ha! Thanks for explanation!

“This trait is automatically derived when the compiler determines it’s appropriate.”

I must have missed that when reading the documentation.

I’ll try to write a Write implementation for Arc<Mutex<W>>, see how much fun it is xD


#7

I was able to sketch the general idea: https://play.rust-lang.org/?gist=9b96d12b3a7e5ae1abaa&version=stable

use std::io;
use std::io::Write;
use std::fmt::{Debug, Formatter, Error};
use std::thread;
use std::time::Duration;
use std::sync::{Arc, Mutex};

struct Output<W>(Arc<Mutex<W>>);

impl<W: Write> Write for Output<W> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        (*self.0.lock().unwrap()).write(buf)
    }
    
    fn flush(&mut self) -> io::Result<()> {
        (*self.0.lock().unwrap()).flush()
    }
}

impl<W: Write> Clone for Output<W> {
    fn clone(&self) -> Self {
        Output(self.0.clone())
    }
}

impl<W: Debug> Debug for Output<W> {
    fn fmt(&self, fmt: &mut Formatter) -> Result<(), Error> {
        (*self.0.lock().unwrap()).fmt(fmt)
    }
}

fn main() {
    let buffer = Output(Arc::new(Mutex::new(Vec::new())));
    write_to(&buffer);
    
    thread::sleep(Duration::new(1, 0));
    
    println!("{:?}", buffer);
}

fn write_to<W: Write + Send + 'static>(buffer: &Output<W>) {
    let mut thread_buffer = buffer.clone();
    
    thread::spawn(move||{
        thread_buffer.write(b"hello").unwrap();
    });
}

Any advice?


#8

It depends on the desired API if the Output type is necessary. You could also simply do

fn write_to<W: Write + Send + 'static>(buffer: &Arc<Mutex<W>>) {
    let thread_buffer = buffer.clone();
    thread::spawn(move || {
        thread_buffer.lock().unwrap().write(b"hello").unwrap();
    });
}

#9

Awesome!

But I think the API will benefit more of an abstraction around output.


#10

You can simplify it further:

use std::io;
use std::io::Write;
use std::fmt::{Debug, Formatter, Error};
use std::thread;
use std::time::Duration;
use std::sync::{Arc, Mutex};

#[derive(Clone)]
struct Output<W>(Arc<Mutex<W>>);

impl<W: Write> Output<W> {
    pub fn new(w: W) -> Self {
        Output(Arc::new(Mutex::new(w)))
    }
}

impl<W: Write> Write for Output<W> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        (*self.0.lock().unwrap()).write(buf)
    }
    
    fn flush(&mut self) -> io::Result<()> {
        (*self.0.lock().unwrap()).flush()
    }
}

impl<W: Debug> Debug for Output<W> {
    fn fmt(&self, fmt: &mut Formatter) -> Result<(), Error> {
        (*self.0.lock().unwrap()).fmt(fmt)
    }
}

fn main() {
    let buffer = Output::new(Vec::new());
    write_to(buffer.clone());

    thread::sleep(Duration::new(1, 0));

    println!("{:?}", buffer);
}

fn write_to<W: Write + Send + 'static>(mut buffer: W) {
    thread::spawn(move || buffer.write(b"hello").unwrap());
}

There’s no need to specialise write_to; the whole point of Output is that it’s something that is both Write and Send and shareable… but write_to only needs those first two things. Also, if you are always going to clone, then it’s better to just take by-value and let the caller decide if they need to clone or not.