Struggling with parallelism and mutual references

First post here.

I thought an MRE might be a good idea, below. But it's easy to explain in words: I want to have a framework which has a hashmap of documents. Then the documents get gathered into that hashmap. But the documents also have to have a reference to the framework.

After this the framework iterates through the map, getting each document and processing each document in some way.

So far so OK. After a few months of Rust I have a primitive grasp of RefCell and come up with this:

use std::any::type_name_of_val;
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use anyhow::{Result, anyhow};
use camino::Utf8PathBuf;
use std::fmt::Debug;
use std::fmt::Formatter;

fn main() {
    let framework = Framework::new(&gather_closure);
    println!("framework: {:?} type {}", framework, type_name_of_val(&framework));
    framework.process_documents();
    println!("end of main");
}

// this gathers the Documents into the hashmap. This rather convoluted arrangement comes about due to the apparent need to use `new_cyclic`
fn gather_closure(pathbuf: Utf8PathBuf, framework: &Weak<Framework>) -> Result<HashMap<Utf8PathBuf, Box<dyn Document>>> {
    println!("gather closure called...");
    let mut hm: HashMap<Utf8PathBuf, Box<dyn Document>> = HashMap::new();
    for i in 0..10 {
        let td = DocumentStruct::new(&Weak::clone(framework), i);
        let _ = hm.insert(format!("bubbles {i}").into(), Box::new(td) as Box<dyn Document>);
    }
    Ok(hm)
}

#[derive(Debug)]
struct Framework {
    name: String,
    doc_map: RefCell<HashMap<Utf8PathBuf, Box<dyn Document>>>,
}

impl Framework {
    fn new(gather_closure: &dyn Fn(Utf8PathBuf, &Weak<Framework>) -> Result<HashMap<Utf8PathBuf, Box<dyn Document>>>) -> Arc<Self> {
        Arc::new_cyclic(|this| {
            Framework {
                name: "my_framework".into(),
                doc_map: {
                    let map = match gather_closure("some_path".into(), this){
                        Ok(map) => map,
                        Err(e) => panic!("error gathering files: {e}")
                    };
                    RefCell::new(map)
                },
            }
        })
    }

    fn process_documents(&self){
        for document in self.doc_map.borrow_mut().values_mut() {
            document.parse();
        }
    }
}

trait Document {
    fn parse(&self);
}
impl Debug for dyn Document {
    fn fmt(&self, _: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { 
        println!("a document");
        Ok(())
    }
}
#[derive(Debug)]
struct DocumentStruct {
    framework: Arc<Weak<Framework>>,
    i: usize,
}
impl Document for DocumentStruct {
    fn parse(&self){
        let framework_name = &self.framework.upgrade().unwrap().name;
        println!("parsing DocumentStruct {} in framework {framework_name}", self.i)
    }
}
impl DocumentStruct {
    fn new(framework: &Weak<Framework>, i: usize) -> Self {
        DocumentStruct{
            framework: framework.clone().into(),
            i
        }
    }
}

Desired improvement: this business of parsing each document is lengthy, so I want to implement parallelism. I have also spent months groping towards an understanding of how you implement parallelism in Rust. But my previous structure was more complicated, with no RefCell, and with a third data structure, to which both the framework and the documents had access. No Weak references were involved, etc. But the "intermediary" structure was very unnatural, not least because I (as a human thinker) know that the framework-documents structure poses no dereferencing or other dangers. It should be possible to implement this thing with just one Framework object and multiple Document objects.

So anyway, I introduce a scoped threadpool in Framework::process_documents(). I understand that Rc must now be switched to Arc, and std::Weak to sync:Weak. I try to understand what the compiler's telling me and follow through. Then I get stuck at this point:

use std::any::type_name_of_val;
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use anyhow::{Result, anyhow};
use camino::Utf8PathBuf;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::{thread, time::Duration};
use scoped_thread_pool::*;


fn main() {
    let framework = Framework::new(&gather_closure);
    println!("framework: {:?} type {}", framework, type_name_of_val(&framework));
    framework.process_documents();
    println!("end of main");
}

fn gather_closure(pathbuf: Utf8PathBuf, framework: &Weak<Framework>) -> Result<HashMap<Utf8PathBuf, Arc<Box<dyn Document + Sync>>>> {
        println!("gather closure called...");
    let mut hm: HashMap<Utf8PathBuf, Arc<Box<dyn Document + Sync>>> = HashMap::new();
    for i in 0..10 {
        let arc_td = Arc::new(DocumentStruct::new(&Weak::clone(framework), i));
        let _ = hm.insert(format!("bubbles {i}").into(), Box::new(arc_td) as Arc<Box<dyn Document + Sync>>);
    }
    Ok(hm)
}

#[derive(Debug)]
struct Framework {
    name: String,
    doc_map: RefCell<HashMap<Utf8PathBuf, Arc<Box<dyn Document + Sync>>>>,
}

impl Framework {
    fn new(gather_closure: &dyn Fn(Utf8PathBuf, &Weak<Framework>) -> Result<HashMap<Utf8PathBuf, Arc<Box<dyn Document + Sync>>>>) -> Arc<Self> {
            Arc::new_cyclic(|this| {
            Framework {
                name: "my_framework".into(),
                doc_map: {
                    let map = match gather_closure("some_path".into(), this){
                        Ok(map) => map,
                        Err(e) => panic!("error gathering files: {e}")
                    };
                    RefCell::new(map)
                },
            }
        })
    }

    fn process_documents(&self){
		let n_virtual_cores: usize = std::thread::available_parallelism().unwrap().into();
		let scoped_threadpool = Pool::new(n_virtual_cores);
		let _ = scoped_threadpool.scoped(|scope| {
            for document in self.doc_map.borrow_mut().values_mut() {
                scope.execute(move ||{
                    document.parse()
                })
            }
        });
    
    }
}

trait Document {
    fn parse(&self);
}
impl Debug for dyn Document {
    fn fmt(&self, _: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { 
        println!("a document");
        Ok(())
    }
}
#[derive(Debug)]
struct DocumentStruct {
    framework: Arc<Weak<Framework>>,
    i: usize,
}
impl Document for DocumentStruct {
    fn parse(&self){
        let framework_name = &self.framework.upgrade().unwrap().name;
        println!("parsing DocumentStruct {} in framework {framework_name}...", self.i);
        thread::sleep(Duration::from_millis(1000));
        println!("... DocumentStruct {} finished parsing", self.i);

    }
}
impl DocumentStruct {
    fn new(framework: &Weak<Framework>, i: usize) -> Self {
        DocumentStruct{
            framework: framework.clone().into(),
            i
        }
    }
}

This gives the following errors:

error[E0605]: non-primitive cast: `Box<Arc<DocumentStruct>>` as `Arc<Box<dyn Document + Sync>>`
  --> src\main.rs:28:58
   |
28 |         let _ = hm.insert(format!("bubbles {i}").into(), Box::new(arc_td) as Arc<Box<dyn Document + Sync>>);
   |                                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ an `as` expression can only be used to convert between primitive types or to coerce to a specific trait object

error[E0277]: `(dyn Document + Sync + 'static)` doesn't implement `Debug`
  --> src\main.rs:36:5
   |
33 | #[derive(Debug)]
   |          ----- in this derive macro expansion
...
36 |     doc_map: RefCell<HashMap<Utf8PathBuf, Arc<Box<dyn Document + Sync>>>>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `(dyn Document + Sync + 'static)` cannot be formatted using `{:?}` because it doesn't implement `Debug`
   |
   = help: the trait `Debug` is not implemented for `(dyn Document + Sync + 'static)`
   = help: the following other types implement trait `Debug`:
             (dyn Document + 'static)
             (dyn Any + 'static)
             (dyn Any + Send + 'static)
             (dyn Any + Send + Sync + 'static)
   = note: this error originates in the derive macro `Debug` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0277]: `(dyn Document + Sync + 'static)` cannot be sent between threads safely
   --> src\main.rs:66:31
    |
66  |                   scope.execute(move ||{
    |  _______________________-------_^
    | |                       |
    | |                       required by a bound introduced by this call
67  | |                     document.parse()
68  | |                 })
    | |_________________^ the trait `Send` is not implemented for `Unique<(dyn Document + Sync + 'static)>`
    |
    = note: the trait bound `Unique<(dyn Document + Sync + 'static)>: Send` is not satisfied
    = note: required for `Unique<(dyn Document + Sync + 'static)>` to implement `Send`
note: required because it appears within the type `Box<(dyn Document + Sync + 'static)>`
   --> D:\apps\rust\rust_1.70.0\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\alloc\src\boxed.rs:195:12
    |
195 | pub struct Box<
    |            ^^^
    = note: required for `Arc<Box<(dyn Document + Sync + 'static)>>` to implement `Send`
    = note: required because it appears within the type `&mut Arc<Box<(dyn Document + Sync + 'static)>>`
note: required because it's used within this closure
   --> src\main.rs:66:31
    |
66  |                 scope.execute(move ||{
    |                               ^^^^^^^
note: required by a bound in `scoped_thread_pool::Scope::<'scope>::execute`
   --> D:\apps\rust\rust_1.70.0\.cargo\registry\src\index.crates.io-6f17d22bba15001f\scoped-thread-pool-1.0.4\src\lib.rs:325:23
    |
323 |     pub fn execute<F>(&self, job: F)
    |            ------- required by a bound in this associated function
324 |     where
325 |         F: FnOnce() + Send + 'scope,
    |                       ^^^^ required by this bound in `Scope::<'scope>::execute`
help: consider borrowing here
    |
66  |                 scope.execute(&move ||{

It's not that these errors are entirely baffling, more that I don't know where to go from here.

You have a Box<Arc<...>> and you're trying to cast it to a Arc<Box<...>>, but that will never work (notice how they are switched). Maybe you made a typo in the casting code?

As a sidenote, you probably don't need the Box there, as you can have a trait object in an Arc.

2 Likes

Great pointers! Working on it...

Arc and Box are both distinct types. It is not possible to cast between them. You must explicitly wrap the boxed type in an Arc if you literally want Arc<Box<T>>:

        let arc_td = Arc::new(Box::new(DocumentStruct::new(&Weak::clone(framework), i)) as Box<dyn Document + Sync>);
        let _ = hm.insert(format!("bubbles {i}").into(), arc_td);

(SkiFire's suggestion of Arc<dyn Document> is much better.)

Fixing that will lead you to the next problem: RefCell cannot be shared between threads. You need the multithreaded version of interior mutability: Mutex (or RwLock if it's more appropriate).

Changing RefCell to Mutex and .borrow_mut() to .lock().unwrap() fixes that, but also reveals a new problem: You are sending the Arc<Box<dyn Document>> across threads, but the dyn Document is not marked Send. So replace dyn Document + Sync with dyn Document + Send + Sync everywhere, and then ...

You need to clone the inner Arc prior to spawning the thread:

        let _ = scoped_threadpool.scoped(|scope| {
            for document in self.doc_map.lock().unwrap().values_mut() { // Changed for `Mutex`
                let document: Arc<_> = document.clone(); // Added
                scope.execute(move ||{
                    document.parse()
                })
            }
        });

But at last, we come to a showstopper. The Mutex makes it impossible to parallelize with threads. You need a concurrent hashmap, like dashmap (but beware; it's deadlock hell). DashMap can replace Mutex<HashMap<_, _>>, and the call to values_mut() just needs to be replaced with iter_mut().

And finally, I think it all works the way you want? But the code still has what looks like a lot of cargo-culting, like Arc<Weak<T>> ... what's that all about? Just use Weak<T>, my dude!

The kinda-sorta fixed project can be found here: docmap (gist.github.com)

1 Like

@parasyte Perfect! Thanks so much.

I can also see this working if I bump up the number of documents to 30: with 8 cores on my machine, 4 seconds to complete with parallel processing. But if I comment out the threadpool stuff: 30 seconds.

Now for my own personal debrief: I'm going to examine how you did this in forensic detail... and also check out how it marries up with the more challenging compiler messages.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.