Strategies for sending/cloning a struct containing non-`Send` fields to another thread?


#1

Hi everyone,

Apologies in advance for the lengthy post. I’m working on a small program that fetches things over HTTP and running into issues moving things between threads.

For testability, I’ve created an abstraction called URLFetcher as a trait. The idea is that when running a live program, I can send in an implementation called URLFetcherLive which will perform a real HTTP request, but during testing, I can send in a URLFetcherPassThrough that just returns data that I’ve prepopulated.

The trait’s implementation isn’t that important, but it looks roughly like this:

pub trait URLFetcher: Send + URLFetcherClone {
    fn fetch(&mut self, raw_url: String) -> Result<(Vec<u8>, String)>;
}

The “live” version contains a Tokio Core and a Hyper Client, with the idea being that I want to pool connections across invocations for any local uses of it. It looks roughly like:

#[derive(Clone, Debug)]
pub struct URLFetcherLive {
    pub client: Client<hyper::client::HttpConnector, hyper::Body>,
    pub core:   Core,
}

This worked fine right up until the point that I introduced threads. In my program model, I have a single master that boots up a number of worker threads, each of which should be holding their own Hyper Client.

Client holds a Rc<_>, so when I tried to send it I get an error like the following (more accurately, a whole bunch of them):

error[E0277]: the trait bound `std::rc::Rc<std::cell::RefCell<hyper::client::pool::PoolInner<hyper::client::HyperClient<hyper::Body>>>>: std::marker::Send` is not satisfied in `url_fetcher::URLFetcherLive`
  --> src/url_fetcher.rs:46:6
   |
46 | impl URLFetcher for URLFetcherLive {
   |      ^^^^^^^^^^ `std::rc::Rc<std::cell::RefCell<hyper::client::pool::PoolInner<hyper::client::HyperClient<hyper::Body>>>>` cannot be sent between threads safely
   |
   = help: within `url_fetcher::URLFetcherLive`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<hyper::client::pool::PoolInner<hyper::client::HyperClient<hyper::Body>>>>`
   = note: required because it appears within the type `hyper::client::pool::Pool<hyper::client::HyperClient<hyper::Body>>`
   = note: required because it appears within the type `hyper::client::Dispatch<hyper::Body>`
   = note: required because it appears within the type `hyper::Client<hyper::client::HttpConnector>`
   = note: required because it appears within the type `url_fetcher::URLFetcherLive`

I understand roughly why this is happening, but I don’t understand my best way to get out of it. Each thread could initialize its own URLFetcherLive, but that breaks the whole point of my abstraction — when I boot the test suite up for the master program, I’d like it to plumb a URLFetcherPassThrough all the way through to every worker. Similarly, when I run it live, I want every thread to get a URLFetcherLive — so basically a copy of whatever the master thread was initialized with. Workers shouldn’t have to think about any particular concrete implementation.

It seems to me that what I’d need is some sort of cloning opportunity before moving a struct between threads — a way for me to tell me the compiler, “I know this struct isn’t Send, but here look, I’m going to safety re-initialize non-Send attributes before the move, so it’s okay.” To my knowledge, this doesn’t exist.

The only other way I can think to solve this is to introduce a meta-structure (think like a factory) that I pass between threads and which I can then use to initialize an appropriate URLFetcher with a brand new Tokio Core and Hyper Client if necessary.

Does anyone have any advice as to whether I’m missing something or thinking about this the wrong way?

Thanks in advance!


#2

I am not experienced with it, but have you tried to work with Arc pointer instead of Rc. I think wrapping pointer should be send’able to fix your issue.


#3

Thanks for the response!

So the Rc<_> here is actually a field nested somewhere in my Hyper Client rather than a field that I can directly control. I believe that Client is not intended to be shared between threads by design.


#4

Ohh, I see. That is right. I have faced same issue few weeks ago. Here is the only relevant thread I found: https://github.com/hyperium/hyper/issues/1082 And concluded to go single threaded mode with a cluster of distributed processes, one per core.


#5

Ah, thank you. That’s helpful — I’m glad to see that this has been raised before.


#6

It was raised, but I have got the impression it is not going to be changed.


#7

Alright, going to answer my own question here (although while this is an answer, there may be a better one):

I ended up working around the problem by implementing a factory meta-structure for my URLFetcher trait. The idea here is that while specific implementations of URLFetcher may be difficult to share between threads because they contain complex structures like a Tokio Core or a Hyper Client, a factory is a simple facade that just knows how to instantiate a URLFetcher when necessary.

The trait (note that it implements Send):

pub trait URLFetcherFactory: Send {
    // This is here because it's difficult to make a trait cloneable because that implies `Sized` which traits are not.
    fn clone_box(&self) -> Box<URLFetcherFactory>;

    fn create(&self) -> Box<URLFetcher>;
}

And an implementation (notice how it’s free of fields and therefore has no trouble conforming to Send):

#[derive(Clone, Debug)]
pub struct URLFetcherFactoryLive {}

impl URLFetcherFactory for URLFetcherFactoryLive {
    fn clone_box(&self) -> Box<URLFetcherFactory> {
        return Box::new(Self {});
    }

    fn create(&self) -> Box<URLFetcher> {
        let core = Core::new().unwrap();
        let client = Client::new(&core.handle());
        Box::new(URLFetcherLive {
            client: client,
            core:   core,
        })
    }
}

I clone a factory before it’s sent to a thread via move:

let factory_clone = self.url_fetcher_factory.clone_box();

workers.push(thread::Builder::new()
    .name(thread_name)
    .spawn(move || {
        work(&log, pool_clone, factory_clone, work_recv_clone);
    })
    .chain_err(|| "Failed to spawn thread")?);

And once inside work, I just have the factory create me a new one:

let mut url_fetcher = url_fetcher_factory.create();

This gets me roughly the testability I want, while keeping the compiler happy.


#8

I think it is a classic way for dependency injection for testing. I wonder if there are other ‘built-in’ ways or plugins (without writing much code) to achieve dependency injection?