How to read all messages from a channel

I am trying to perform some work by threads in parallel, and use the result from it.
This is my code:

Crates used:

use scoped_threadpool::Pool;
use std::sync::mpsc::channel;

Code:

    let mut pool = Pool::new(2);
    let (tx, rx) = channel();

    pool.scoped(|scope| {
        for hostname_port in hostname_port_vec {
            let tx = tx.clone();
            scope.execute(move || {
                let detail_snapshot_time = Local::now();
                let version = read_version(&hostname_port);
                tx.send((hostname_port, detail_snapshot_time, version)).expect("channel will be waiting in the pool");
            });
        }
        scope.join_all();
    });
    drop(tx);
    let mut stored_versions: Vec<StoredVersionData> = Vec::new();
    while let (hostname_port, detail_snapshot_time, version) = rx.recv().unwrap() {
        dbg!(add_to_version_vector(version, hostname_port, detail_snapshot_time, &mut stored_versions));
    }
    //for (hostname_port, detail_snapshot_time, version) in rx.try_iter().next() {
    //    dbg!(add_to_version_vector(version, hostname_port, detail_snapshot_time, &mut stored_versions));
   // }
  1. My intention is to create 2 threads for work performed.
  2. The threads send their record '(hostname_port, detail_snapshot_time, version)' into a channel.
  3. I expect scope.join_all() to let the threads finish.
  4. The current while construction throws an error.
  5. the commented for loop only picks a single record (?)

Is there anything that I can improve or should improve?

Thanks!

If you remove .try_iter().next() it should just work

So join_all() guarantees the threads to have finished (just to be sure)?
Do I need to perform drop(tx) to guarantee no other messages coming?

I don't know the scoped_threadpool crate, I would think the join_all is not mandatory to call (else why would it be passed as a closure argument, since that's the whole point of this thread scoping pattern).

pool.scoped(|scope| {
    let (tx, rx) = channel();
    for hostname_port in hostname_port_vec {
        let tx = tx.clone();
        scope.execute(move || {
            let detail_snapshot_time = Local::now();
            let version = read_version(&hostname_port);
            tx.send((hostname_port, detail_snapshot_time, version)).expect("channel will be waiting in the pool");
        });
    }
    drop(tx);
    for (hostname_port, detail_snapshot_time, version) in rx {
        dbg!(add_to_version_vector(version, hostname_port, detail_snapshot_time, &mut stored_versions));
    }
});

pool.scope() is blocking until all the threads spawned by scope.execute() are done. So you shouldn't put your collecting loop after it.

BTW, that crate seems quite old and unmaintained, you could use rayon::scope or the soon to be stable std::thread::scope.

2 Likes

Thank you for you help!

What do I need to change to use rayon?
I am pretty much a newbie and it's not easy to understand how to do that.

Why is try_iter().next() used here, whilst previously it only worked without?

Just a copy paste mistake, sorry.

Remove scoped_threadpool from your Cargo.toml and add rayon.

    let pool = rayon::ThreadPoolBuilder::new().num_threads(*parallel).build().unwrap();
    let (tx, rx) = channel();

    pool.scope(|scope| {
        for hostname_port in hostname_port_vec {
            let tx = tx.clone();
            scope.execute(move || {
                let detail_snapshot_time = Local::now();
                let version = read_version(&hostname_port);
                tx.send((hostname_port, detail_snapshot_time, version)).expect("channel will be waiting in the pool");
            });
        }
    });
    drop(tx);
    let mut stored_versions: Vec<StoredVersionData> = Vec::new();
    for (hostname_port, detail_snapshot_time, version) in rx {
        add_to_version_vector(version, hostname_port, detail_snapshot_time, &mut stored_versions);
    }

Throws:

error[E0599]: no method named `execute` found for reference `&Scope<'_>` in the current scope
  --> src/versions.rs:85:19
   |
85 |             scope.execute(move || {
   |                   ^^^^^^^ method not found in `&Scope<'_>`

They don't have the same API.

The analogous method is called spawn.

I am afraid it's not that simple:

    let pool = rayon::ThreadPoolBuilder::new().num_threads(*parallel).build().unwrap();
    let (tx, rx) = channel();

    pool.scope(|scope| {
        for hostname_port in hostname_port_vec {
            let tx = tx.clone();
            scope.spawn(move |_| {
                let detail_snapshot_time = Local::now();
                let version = read_version(&hostname_port);
                tx.send((hostname_port, detail_snapshot_time, version)).expect("channel will be waiting in the pool");
            });
        }
    });
    drop(tx);
    let mut stored_versions: Vec<StoredVersionData> = Vec::new();
    for (hostname_port, detail_snapshot_time, version) in rx {
        add_to_version_vector(version, hostname_port, detail_snapshot_time, &mut stored_versions);
    }

Now complains:

error[E0277]: `Sender<(&&str, DateTime<Local>, VersionData)>` cannot be shared between threads safely
  --> src/versions.rs:82:10
   |
82 |     pool.scope(|scope| {
   |          ^^^^^ `Sender<(&&str, DateTime<Local>, VersionData)>` cannot be shared between threads safely
   |
   = help: the trait `Sync` is not implemented for `Sender<(&&str, DateTime<Local>, VersionData)>`
   = note: required because of the requirements on the impl of `Send` for `&Sender<(&&str, DateTime<Local>, VersionData)>`
   = note: required because it appears within the type `[closure@src/versions.rs:82:16: 91:6]`

I'm so used to crossbeam's channel I forgot, the standard lib channel isn't Sync, you have to move it. Though the simpler solution is just to create it inside the pool scope.

- let (tx, rx) = channel();
pool.scope(|scope| {
+    let (tx, rx) = channel();
    ...
}
1 Like

rx should be inside the pool scope. Else the thread pool is useless.

1 Like

To be honest, this is really confusing. I try to learn and to understand.

Why does the setup with the scoped_threadpool work, and then throw an error about scope with rayon?
Is it sensible to use rayon (and why not use crossbeam)?
Why is would using scoped_threadpool not be a good idea, and is the age a consideration?

You're right; I'll admit, I haven't worked with multithreaded code much.

If you can make a minimal playground example, it'll be much quicker to fix it and share it back to you without this much back and forth.

Just comparing the level of documentation between the two crates should already tell you a lot. rayon is an actively maintained piece of "infrastructure" crate used by almost every Rust user. scoped_threadpool was made when Rust was barely to 1.0 (and was completely unknown to me until today).

Don't get me wrong: I am very grateful to get help, I really am.
But I get the feeling I get pointed to individual points that might be an improvements in itself, but get me in trouble (compile errors) when I apply them, and I know too little to understand the full picture.

There are a few pages of documentation which can help you here.

The Book's section about the Send and Sync traits here and their own respective pages.

And hopefully this playground can help you

Thank you for your help. It is a struggle for me in the sense that what I tried doing was not clearly simply enough documented, and the help was given in bits and pieces. It was a great learning experience.

What I tried doing is I think very simple: I wanted to parallelize the part of the code for which I knew it was spending a lot of time in, doing that was easy because it was called in a loop, so all I needed to do is parallellize the calls in that loop.
However, I wanted the work to be limited in threads/threading to not over-allocate threads diminishing the usefulness of the threading.

I have rewritten my code in the following way, where I simplified it a bit: Rust Playground
It seems that the playground does not allow to call out, so I changed my reqwest callout to a string definition.

Please let me know if there's anything that can be done better or simpler.
Again: thank you!

Here's a modified playground with a few examples of how you could use rayon.

1 Like

Okay, it seemed I was too much focussed on creating the "infrastructure" for the parallelism, whilst there was a simpler way. And simpler is better.

The elementary task that can increase time in my case is reading somewhat large chunks of data from a web server. The data read must be added to a vector with some additional data, such as the hostname of where the data came from, and sometimes a timestamp.

This is what I do now:

pub fn read_statements_into_vector(
    hostname_port_vec: &Vec<&str>,
    parallel: usize
) -> Vec<StoredStatements>
{
    let pool = rayon::ThreadPoolBuilder::new().num_threads(parallel).build().unwrap();
    let (tx, rx) = channel();
    pool.scope(move |s| {
        for hostname_port in hostname_port_vec {
            let tx = tx.clone();
            s.spawn(move |_| {
                let detail_snapshot_time = Local::now();
                let statements = read_statements(&hostname_port);
                tx.send((hostname_port, detail_snapshot_time, statements)).expect("channel will be waiting in the pool");
            });
        }
    });
    let mut stored_statements: Vec<StoredStatements> = Vec::new();
    for (hostname_port, detail_snapshot_time, statements) in rx {
        add_to_statements_vector(statements, hostname_port, detail_snapshot_time, &mut stored_statements);
    }
    stored_statements
}

So it's a function that receives a Vector of str with the hostnames and the desired parallelism, and then spawns as much threads as the parallelism is allowed.
The results are inserted into Vector together with the hostname where it came from and the time it was fetched.
If all the results are inserted, the vector is returned.

I tried with your suggestion of using .into_par_iter(), but it runs into all kinds of issues like not wanting to iterate over hostname_port_vec, and I have to pass 3 arguments, etc.

1 Like