I ran the code at https://doc.rust-lang.org/1.7.0/book/concurrency.html
fn main() {
let (tx, rx) = mpsc::channel();
for i in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
let answer = i * i;
tx.send("Thread" + i.to_string() + ":" + answer.to_string()).unwrap();
});
}
for k in 0..10 {
println!("{}", rx.recv().unwrap());
}
}
I immediately ran into a logical conundrum. Sequential processing always returns the results in the correct order, but in this example, the order of results is rather random. I tried to fix this by sending the value of i with the thread, but the compiler complains about the concatenation:
tx.send("Thread" + i.to_string() + ":" + answer.to_string()).unwrap();
rustc : error[E0369]: binary operation +
cannot be applied to type &str
-
I like to use something like Python's f-strings f"Thread {i}: {answer}", does that exist in Rust?
-
what do I do if I want to maintain the correct order?
I also tried this according to https://doc.rust-lang.org/std/sync/mpsc/
use std::sync::mpsc::sync_channel;
fn main() {
let (tx, rx) = sync_channel::<i32>(10);
for i in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
let answer = i * i;
tx.send(answer).unwrap();
});
}
for _ in 0..10 {
println!("{}", rx.recv().unwrap());
}
}
This has the same incorrect-order problem.
3) what is this synchronous threading? Clearly what the mpsc page says it is and what I understood it to be is not the same thing.
1/Although it does not do exactly the same thing, you may find the format! macro interesting.
2/One important thing to keep in mind when getting started with any kind of parallelization (not just threads, but also vectorization or network distribution) is that order is a very expensive property in a parallel world. If you want to maintain it, you will need to start from the disordered output that naturally emerges from your independent threads and painstakingly put it back in the "right" order. Obviously, this comes at a performance cost, and should only be done in strategic places such as when generating user-facing output.
Here are two possible reordering strategies for your particular problem:
- Collect all the output in a Vec, then sort it.
- Instead of a queue, use a mutex-protected BinaryHeap to directly collect output in the right order, then tell the main thread when you are done filling up the heap.
Note that in both cases, the main thread will need to wait for all output to be available before starting to print it, and some extra work will be needed to put the output in the right order. That's the performance cost of order which I was talking about earlier. It is negligible in this case, but can become a significant expense in more complex programs.
3/The "synchronous" adjective is often used as a synonym of "blocking", warning you that a certain operation may block the host OS thread. Here, the MPSC page is just telling you that sending data on a bounded channel is a potentially blocking operation, which will block the client thread if the channel's underlying storage is already full of data.
Here's a fleshed out example of how you might go about this:
use std::sync::mpsc::channel;
use std::thread;
fn main() {
let (tx, rx) = channel();
for i in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
let answer = i * i;
tx.send((
i,
answer,
// can also do: format!("Thread{num}: {answer}", num=i, answer=answer)
// which is like string interpolation
format!("Thread{}: {}", i, answer),
)).unwrap();
});
}
// can also use an array of Option<usize> to not have default value of 0 in there
let mut results = [0; 10];
for _ in 0..10 {
let (idx, res, string) = rx.recv().unwrap();
println!("{}", string);
results[idx] = res;
}
println!("{:?}", results);
}
Playground
As mentioned by @HadrienG, the format!
macro can be used to construct a String. There's a commented out version that uses string interpolation-like approach, but typically you'll skip that and just use placeholders, like the code above.
You can send an arbitrary type over the channel (using a tuple here) which can carry any data that you need, such as the index of the submission. On rx side, you can reassociate that index however you like (here I'm just storing the result into an array slot with the same index, which reassembles the "order" of submission).
1 Like