Hello! I need to implement merge sort runs on a stream of inputs. I'd like to know if the following code can be re-written without having to write try_iter().peekable()
in match arms:
use crossbeam_channel::unbounded;
use std::{thread, time::Duration};
fn main() {
let (s1, r1) = unbounded();
let (s2, r2) = unbounded();
let h1 = thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
s1.send(2).unwrap();
println!("s1 sent 2");
thread::sleep(Duration::from_millis(100));
s1.send(3).unwrap();
println!("s1 sent 3");
thread::sleep(Duration::from_millis(100));
s1.send(42).unwrap();
println!("s1 sent 42");
});
let h2 = thread::spawn(move || {
thread::sleep(Duration::from_millis(800));
s2.send(1).unwrap();
println!("s2 sent 1");
thread::sleep(Duration::from_millis(800));
s2.send(41).unwrap();
println!("s2 sent 41");
thread::sleep(Duration::from_millis(800));
s2.send(43).unwrap();
println!("s2 sent 43");
});
let (mut it1, mut it2) = (r1.try_iter().peekable(), r2.try_iter().peekable());
loop {
match (it1.peek(), it2.peek()) {
(Some(v1), Some(v2)) => {
let (x, is_r1) = if v1 < v2 {
(v1, true)
} else {
(v2, false)
};
println!("Output: {}", x);
if is_r1 {
it1 = r1.try_iter().peekable();
} else {
it2 = r2.try_iter().peekable();
}
}
(Some(v), None) => {
println!("r1 has {} and r2 is empty", v);
std::thread::sleep(std::time::Duration::from_millis(200));
it2 = r2.try_iter().peekable();
}
(None, Some(v)) => {
println!("r1 is empty and r2 has {}", v);
std::thread::sleep(std::time::Duration::from_millis(200));
it1 = r1.try_iter().peekable();
}
(_, _) => {
println!("Both should be non-empty, try again!");
std::thread::sleep(std::time::Duration::from_millis(200));
it1 = r1.try_iter().peekable();
it2 = r2.try_iter().peekable();
}
}
}
unreachable!();
}
Wherever the above code prints Output: *
, it should be sorted in the ascending order (except that it will not print Output: 43
because it never reaches the first match arm after it has printed out Output: 42
). Among what I dislike about the above code snippet, it's try_iter().peekable()
sprinkled across the match arms. I did so because the main thread needs to peek at elements across all non-empty channels and determine which channel has the next smallest element.
Is there a way to improve the code snippet so that I don't need to scatter try_iter().peekable()
in match arms and avoid doing so at every iteration of the loop; if I am to wait for more than two channels to be all non-empty, writing try_iter().peekable()
in some match arms can be error-prone. Or maybe more broadly, should I use a different data structure other than channels to accomplish what I want?
As a side note, I specifically use crossbeam_channel
for the code snippets presented, but I believe std::sync::mpsc::channel
can also be used to make my point.