Parallel reference processing with mutation

I would like to parallelise a program that can be approximated by the following toy example (playground):
It splits a text into phrases, then splits each phrase into words and outputs for every phrase an increasing number of words.

use rayon::prelude::*;
fn main() {
    let mut len = 0;

    let i = String::from("To be. Or not to be. That is the question");
    let phrases = i.split(".");
    let iter = phrases
        //.par_bridge()
        .map(|p: &str| p.split_whitespace().collect::<Vec<_>>());
    let iter = iter.map(|p: Vec<&str>| {
        len += 1;
        p.into_iter().take(len).collect::<Vec<_>>()
    });
    iter.for_each(|p| println!("{:?}", p))
}

This works fine and prints:

["To"]
["Or", "not"]
["That", "is", "the"]

Now I would like to parallelise the splitting into words using Rayon.
However, when I uncomment the call to par_bridge, I get an error that I "cannot assign to len, as it is a captured variable in a Fn closure". Fair enough.
My go-to solution in such scenarios is to create a channel and to send things through it (playground):

use std::sync::mpsc::channel;
use std::thread;

fn main() {
    let (tx, rx) = channel();

    thread::spawn(move || {
        let i = String::from("To be. Or not to be. That is the question");
        let phrases = i.split(".");
        let iter = phrases.map(|p: &str| p.split_whitespace().collect::<Vec<_>>());
        iter.for_each(|i: Vec<&str>| tx.send(i).unwrap())
    });
}

However, here I get the error that i does not live long enough. That makes sense: When sending a Vec<&str> through the channel, nothing ensures here that the &str references live long enough. (In this toy example, I could circumvent the error by omitting String::from and thus making i: &'static str, but in my actual application, I cannot.)

I thought about solving this problem in two ways, both of which have significant downsides:
The first solution would be to collect into Vec<String>; however, converting every &str to String significantly increases runtime (I have a very large number of &strs in my actual application).
The second solution would be to move every &str of Vec<&str> into an arena to ensure that the references live long enough; however, this significantly increases memory consumption.
Do you know of other ways to solve this problem?

If you collect the phrases into Vec<&str> first you can .enumerate() on the parallel iterator.

    let phrases: Vec<_> = i.split('.').collect();
    let v: Vec<_> = phrases
        .par_iter()
        .copied()
        .map(|p: &str| p.split_whitespace().collect::<Vec<_>>())
        .enumerate()
        .map(|(len, p)| p.into_iter().take(len + 1).collect::<Vec<_>>())
        .collect();
2 Likes

Thank you for your reply. While I could do that, it would make the data processing non-lazy, which is undesirable in my case. (It would --- sometimes considerably --- increase the time until showing the first result.)

I'm out for a solution that compromises neither on concurrency, CPU usage, RAM usage, nor laziness. :slight_smile:

What does the lazy means in this context? I don't see anything lazy on your examples.

Ah, sorry. My input data is usually loaded from files, which can be of arbitrary size. In case of loading from standard input, this could be potentially infinite. Collecting into a Vec would not terminate in such a case.

You're not forced to .collect() as final step. Replacing it with .for_each(|p| println!("{:?}", p)) gives you the same semantics as the first snippet in your original post

Even without collecting in the final step, Hyeonu's example collects in the first step, which suffices to make data processing non-lazy. And without collect as first step, I think that we cannot use enumerate.
Anyway, even if using enumerate was possible, I just used len as an example of a mutable variable. My actual mutable data is more complex --- it is more like a mutable HashSet of the first words of all phrases processed so far.

I made another example to better explain my use case:
It takes a string as input, where every phrase is a list of dependencies terminated by a target.
The program succeeds if all dependencies are previously introduced targets.

use rayon::prelude::*;
fn main() {
    let mut targets = std::collections::HashSet::new();
    let input = String::from("foo. foo bar. bar foo baz. bar snafu. nothing");
    let phrases = input.split(".");
    let iter = phrases
        //.par_bridge()
        .map(|p: &str| p.split_whitespace().collect::<Vec<_>>());
    for mut p in iter {
        match p.pop() {
            Some(target) if p.iter().all(|dep| targets.contains(dep)) => {
                println!("{} depends on: {:?}", target, p);
                targets.insert(target);
            }
            _ => {
                println!("Error!");
                return;
            }
        }
    }
}

The output is:

foo depends on: []
bar depends on: ["foo"]
baz depends on: ["bar", "foo"]
snafu depends on: ["bar"]
nothing depends on: []

Is it possible to make the "parsing" of phrases concurrent, without introducing significant overhead (such as latency before displaying an error, CPU usage, RAM usage) compared to the sequential version?

Oops, didn't see that.

I think you can .enumerate() before the .par_bridge(). Still, looks like that wasn't the actual problem.

The for mut p in iter is surely sequential, so you can't parallelize it. What you could do is spawn another thread, run the parallel loop in that thread (so that it doesn't block and you can go to that for loop) and send the results back to the initial thread with a channel. My guess is that a bounded channel is probably what you want here to prevent the infinite iterator from producing too much output, but you should bench it anyway.

What you are describing is precisely what I had in mind. But I cannot send the results from the parallel loop with a channel, because my results contain references. My question was whether there was a way to avoid converting the references (&str) to owned types (String), because this comes with a significant overhead.

You could try with scoped threads, since they should allow to capture references with local lifetimes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.