Implementing merge sort in a stream fashion using channels

Hello! I need to implement merge sort runs on a stream of inputs. I'd like to know if the following code can be re-written without having to write try_iter().peekable() in match arms:

use crossbeam_channel::unbounded;
use std::{thread, time::Duration};

fn main() {
    let (s1, r1) = unbounded();
    let (s2, r2) = unbounded();

    let h1 = thread::spawn(move || {
        thread::sleep(Duration::from_millis(100));
        s1.send(2).unwrap();
        println!("s1 sent 2");
        thread::sleep(Duration::from_millis(100));
        s1.send(3).unwrap();
        println!("s1 sent 3");
        thread::sleep(Duration::from_millis(100));
        s1.send(42).unwrap();
        println!("s1 sent 42");
    });

    let h2 = thread::spawn(move || {
        thread::sleep(Duration::from_millis(800));
        s2.send(1).unwrap();
        println!("s2 sent 1");
        thread::sleep(Duration::from_millis(800));
        s2.send(41).unwrap();
        println!("s2 sent 41");
        thread::sleep(Duration::from_millis(800));
        s2.send(43).unwrap();
        println!("s2 sent 43");
    });

    let (mut it1, mut it2) = (r1.try_iter().peekable(), r2.try_iter().peekable());

    loop {
        match (it1.peek(), it2.peek()) {
            (Some(v1), Some(v2)) => {
                let (x, is_r1) = if v1 < v2 {
                    (v1, true)
                } else {
                    (v2, false)
                };
                println!("Output: {}", x);
                if is_r1 {
                    it1 = r1.try_iter().peekable();
                } else {
                    it2 = r2.try_iter().peekable();
                }
            }
            (Some(v), None) => {
                println!("r1 has {} and r2 is empty", v);
                std::thread::sleep(std::time::Duration::from_millis(200));
                it2 = r2.try_iter().peekable();
            }
            (None, Some(v)) => {
                println!("r1 is empty and r2 has {}", v);
                std::thread::sleep(std::time::Duration::from_millis(200));
                it1 = r1.try_iter().peekable();
            }
            (_, _) => {
                println!("Both should be non-empty, try again!");
                std::thread::sleep(std::time::Duration::from_millis(200));
                it1 = r1.try_iter().peekable();
                it2 = r2.try_iter().peekable();
            }
        }
    }

    unreachable!();
}

Rust Playground

Wherever the above code prints Output: *, it should be sorted in the ascending order (except that it will not print Output: 43 because it never reaches the first match arm after it has printed out Output: 42). Among what I dislike about the above code snippet, it's try_iter().peekable() sprinkled across the match arms. I did so because the main thread needs to peek at elements across all non-empty channels and determine which channel has the next smallest element.

Is there a way to improve the code snippet so that I don't need to scatter try_iter().peekable() in match arms and avoid doing so at every iteration of the loop; if I am to wait for more than two channels to be all non-empty, writing try_iter().peekable() in some match arms can be error-prone. Or maybe more broadly, should I use a different data structure other than channels to accomplish what I want?

As a side note, I specifically use crossbeam_channel for the code snippets presented, but I believe std::sync::mpsc::channel can also be used to make my point.

I suppose what you’re asking for is possible with the Select API

Rust Playground

(Edit: The above playground might not handle error cases properly, i.e. if the channel is closed.)

I’ve got to say, your use case is relevant though. Why exactly do you need to have two receivers that are both ready to receive (at least) 1 item? Well, actually, your code examples don’t do that either, they produce iterators containing at least 1 item for each channel (plus whatever else might be available). Why not just receive (and block) on both channels in sequence and then work with those two items (and, if wanted, whatever else already is available through the channel)?

If this is just about producing an iterator similar to what you do, then you could just do something like

let r1_first = r1.recv().unwrap();
let r2_first = r2.recv().unwrap();
let it1 = iter::once(r1_first).chain(r1.try_recv());
let it2 = iter::once(r2_first).chain(r2.try_recv());

Rust Playground

2 Likes

Thank you for the suggestion @steffahn

Sorry I left out the critical piece of information, e.g. what needs to done in the first match arm. I updated my original post description. The use case is to implement merge sort in a stream fashion where inputs are provided continuously through multiple input channels. The elements within a channel are guaranteed to be sorted and the frequency at which elements "arrive" to a channel varies depending on channels (ex. new elements on channel X are available every second whereas new elements on channel Y every 5 seconds). With this, the main thread above acts as a merge thread.

Your first playground link seems to get me to the right direction.

UPDATE:
I also updated the example code snippet to show what's actually done in the first match arm.