Process files with arbitrary functions in parallel with rayon

The Goal

I have a file tree and I have independent functions that would like to process that tree. I want something to orchestrate that processing, so that each file is opened once, and then all processing functions are given a chance to run on that file's content. I want this to happen in parallel with rayon.

What I have

This is working, but not quite as I want:

use std::fs;
use std::path::PathBuf;
use rayon::prelude::*;
use crossbeam::channel::{self, Receiver};

struct WorkTreeProcessor {
    file_processors: Vec<Box<Fn(&str) + Send + Sync>>,
}

impl WorkTreeProcessor {
    fn add_file_processor<C, F, R>(&mut self, context: C, function: F) -> Receiver<R>
    where
        C: Clone + Send + Sync + 'static,
        F: Fn(&C, &str) -> R + Send + Sync + 'static,
        R: Send + 'static,
    {
        let (sender, receiver) = channel::unbounded();
        self.file_processors
            .push(Box::new(move |file_content| {
                let result = function(&context, file_content);
                let _ = sender.send(result);
            }));
        receiver
    }

    fn run(&mut self, work_tree_paths: Vec<PathBuf>) {
        work_tree_paths.into_par_iter().for_each(|each_path| {
            if let Ok(file_content) = fs::read_to_string(each_path) {
                for each in self.file_processors.iter() {
                    each(&file_content);
                }
            }
        });
    }
}

What I want

Right now the context passed into each function is a reference. I want it to be a mutable reference. I can't figure out how to make that work.

My general plan is:

  1. Those contexts are clonable.
  2. If I could clone a context into each rayon thread, then I could pass it in as mutable.
  3. Rayons for_each_with method provides a mechanism to clone values into each rayon thread and pass them back to the iterator function.

If I could somehow make the file_processors Clonable then I think that would work. I would move the context into each function. And pass the functions into for_each_with so they and their context would get cloned as needed. But I can't make the functions clone because they are stored as trait objects.

I've also tried wrapping up each function with it's context into a separate clonable object:

#[derive(Clone)]
struct FunctionWithContext<C: Clone> {
    function: Arc<Fn(&mut C)>,
    context: C,
}

that could then be passed into for_each_with. But then when I try to store these as the new file_processors I need to fill in the template, and I get locked into only supporting one type of context.

Anyway I hope that explains my goal. How do I get this to work?

Thanks,
Jesse

Right after posting I started more searching and found discussion of box_clone to work around cloning for trait objects. I "think" this can help me get what I want. This is probably the third time I've gotten stuck with related code and then re-found/remembered the box_clone workaround. My memory's not so good I guess.

Anyway if you see a quick solution to the above please let me know. Otherwise I'll see if I can get things working using box_clone technique. Will report back.

What is wrong with just;

let mut context = context.clone();
let result = function(&mut context, file_content);

or

let result = function(context.clone(), file_content);

If your after shared external mutability then need to be using Arc<Mutex but your description does not clearly give this indication.

I think the problem is that it would be relatively inefficient. In that case context will be cloned once for each processed path. My goal is to have the context cloned once per rayon thread.

My common use case for context is to hold a scratch buffer to use when processing the file so that the processing function can avoid allocation.

Rayon supports this general behavior pretty easily with for_each_with for a specific closure. Here's an example of that:

let buffer = String::with_capacity(100);
paths.into_par_iter().for_each_with(buffer, |buffer, each_path| {
    if let Ok(file_content) = fs::read_to_string(each_path) {
        // buffer is read/write
        // buffer is only cloned when needed (once per rayon thread)
        // then buffer is passed into this closure where is can be used while processing file_content
    }
});

The trouble I'm having is how to make the above case more generic so that it can be used with my previous WorkTreeProcessor code. Ie many different typed closures each that take the same file_content for processing, but have different context types.

More progress...

At a high level this code now works just as I want I think. The blockage that I kept running into was trying to clone a trait object, and once I found that was possible I go through the problem.

In particular:

  1. Arbitrary functions process file content that's loaded in parallel using rayon.
  2. These functions get to decide the type they return
  3. These functions get to decide the mutable context they are passed
  4. The mutable contexts are only cloned as needed (once per rayon thread)

My only remaining question is can the code be cleaned up/simplified?

use std::fs;
use std::sync::Arc;
use std::path::PathBuf;
use rayon::prelude::*;
use crossbeam::channel::{self, Receiver, Sender};

trait FileProcessor: FileProcessorClone + Send {
    fn process_file_content(&mut self, file_content: &str);
}

trait FileProcessorClone {
    fn clone_box(&self) -> Box<FileProcessor>;
}

impl<T> FileProcessorClone for T
where
    T: 'static + FileProcessor + Clone,
{
    fn clone_box(&self) -> Box<FileProcessor> {
        Box::new(self.clone())
    }
}

impl Clone for Box<FileProcessor> {
    fn clone(&self) -> Box<FileProcessor> {
        self.clone_box()
    }
}

#[derive(Clone)]
struct FunctionFileProcessor<C, R> where C: Clone + 'static, R: Clone + Send + 'static {
    context: C,
    function: Arc<Fn(&mut C, &str) -> R + Send + Sync + 'static>,
    results: Sender<R>,
    completed: bool,
}

impl<C, R> FileProcessor for FunctionFileProcessor<C, R> where C: Clone + Send + 'static, R: Clone + Send + 'static {
    fn process_file_content(&mut self, file_content: &str) {
        if !self.completed {
            let result = (self.function)(&mut self.context, file_content);
            if self.results.send(result).is_err() {
                self.completed = true;
            }
        }
    }
}

struct WorkTreeProcessor {
    file_processors: Vec<Box<FileProcessor>>,
}

impl WorkTreeProcessor {
    fn add_file_processor<C, F, R>(&mut self, context: C, function: F) -> Receiver<R>
    where
        C: Clone + Send + Sync + 'static,
        F: Fn(&mut C, &str) -> R + Send + Sync + 'static,
        R: Send + Clone + 'static,
    {
        let (sender, receiver) = channel::unbounded();
        self.file_processors.push(Box::new(FunctionFileProcessor {
            context,
            results: sender,
            completed: false,
            function: Arc::new(function),
        }));
        receiver
    }

    fn run(&mut self, work_tree_paths: Vec<PathBuf>) {        
        let file_processors = self.file_processors.clone();
        work_tree_paths.into_par_iter().for_each_with(file_processors, |file_processors, each_path| {
            if let Ok(file_content) = fs::read_to_string(each_path) {
                for each in file_processors.iter_mut() {
                    each.process_file_content(&file_content);
                }
            }
        });
    }
}

Here is my cleaned version of your code:

#![deny(bare_trait_objects)]

use ::std::{*,
    fs,
    path::PathBuf,
    sync::Arc,
};
use ::rayon::prelude::*;
use ::crossbeam::channel::{
    self,
    Receiver,
    Sender,
};

macro_rules! trait_aliases {
  ...
}

trait FileProcessor : FileProcessorClone + Send {
    fn process_file_contents (&mut self, file_contents: &str);
}

trait FileProcessorClone {
    fn clone_box (&self) -> Box<dyn FileProcessor>;
}

impl<T> FileProcessorClone for T
where
    T : FileProcessor + Clone + 'static,
{
    fn clone_box (&self) -> Box<dyn FileProcessor>
    {
        Box::new(self.clone())
    }
}

impl Clone for Box<dyn FileProcessor> {
    fn clone (&self) -> Box<dyn FileProcessor>
    {
        self.clone_box()
    }
}

trait_aliases! {
    trait alias ContextBounds = {
        Send + 'static +
        Clone
    };
    
    trait alias RetBounds = {
        Send + 'static
    };
    
    trait alias FunctionBounds(Context, Ret) = {
        Send + Sync + 'static +
        Fn(&mut Context, &str) -> Ret
    } where {
        Context : ContextBounds,
        Ret : RetBounds,
    };
}

struct FunctionFileProcessor<Context, Ret>
where
    Context : ContextBounds,
    Ret : RetBounds,
{
    context: Context,
    function: Arc<dyn FunctionBounds<Context, Ret>>,
    results: Sender<Ret>,
    completed: bool,
}

// #[derive(Clone)] is dumb (no need for Ret to be Clone)
impl <Context, Ret> Clone for FunctionFileProcessor<Context, Ret>
where
    Context : ContextBounds,
    Ret : RetBounds,
{
   ...
}

impl<Context, Ret> FileProcessor
    for FunctionFileProcessor<Context, Ret>
where
    Context : ContextBounds,
    Ret : RetBounds,
{
    fn process_file_contents (&mut self, file_contents: &str)
    {
        if self.completed {
            return;
        }
        let result = (&*self.function)(
            &mut self.context,
            file_contents,
        );
        self.completed = self.results.send(result).is_err();
    }
}

struct WorkTreeProcessor {
    file_processors: Vec<Box<dyn FileProcessor>>,
}

impl WorkTreeProcessor {
    fn add_file_processor<Context, Ret, F> (
        &mut self,
        context: Context,
        function: F,
    ) -> Receiver<Ret>
    where
        Context : ContextBounds,
        Ret : RetBounds,
        F : FunctionBounds<Context, Ret>,
    {
        let (sender, receiver) = channel::unbounded();
        self.file_processors.push(Box::new(FunctionFileProcessor {
            context,
            results: sender,
            completed: false,
            function: Arc::new(function),
        }));
        receiver
    }

    fn run (&self, work_tree_paths: Vec<PathBuf>)
    {
        let file_processors = self.file_processors.clone();
        work_tree_paths
            .into_par_iter()
            .filter_map(|path| fs::read_to_string(path).ok())
            .for_each_with(
                file_processors,
                |file_processors, ref file_contents| {
                    file_processors
                        .iter_mut()
                        .for_each(|file_processor| {
                            file_processor
                                .process_file_contents(file_contents)
                        })
                },
            );
    }
}
  • Playground

  • the most important thing, imho, was to annotate the trait objects with the dyn keyword; hence the added lint that errors when it is not done;

  • I have used "trait aliases" to avoid the code repetition on most bounds; this way if you realize you need / no longer need some bound, then all you have to do is change it at the trait alias definition.

  • Note: given the cloning, the completed boolean may be true even if the sender has already errored, which could lead to a function being called only for its result to be discarded right away. Not really an issue per se, but something to be aware of.

  • run does not need a unique borrow on self.

2 Likes

Thank you for your help and notes.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.