Is it possible to reuse channels

I've tried this with crossbeam_channel, mpsc, and flume. From my view it looks like the receive side is never flushed. Below, I've cloned the sender and the receiver. Then reused the sender. The data in the receiver is still the first item in the channel even though I've done a rx1.recv().unwrap(). Am I doing something wrong or do I need a new channel each time?

fn main() {
    let ip_route_dest = &[1, 3, 6, 1, 2, 1, 4, 21, 1, 1, ];
    let ip_route_mask = &[1, 3, 6, 1, 2, 1, 4, 21, 1, 11, ];
    let ip_route_next_hop = &[1, 3, 6, 1, 2, 1, 4, 21, 1, 7, ];
    let ip_route_proto = &[1, 3, 6, 1, 2, 1, 4, 21, 1, 9, ];
    let ip_route_age = &[1, 3, 6, 1, 2, 1, 4, 21, 1, 10, ];

    let oids: &[&[u32]] = &[ip_route_dest,
        ip_route_mask,
        ip_route_next_hop,
        ip_route_proto,
        ip_route_age];

    let (tx1, rx1) = flume::unbounded();
    let (tx2, rx2) = (tx1.clone(), rx1.clone());
    //let (tx3, rx3) = flume::unbounded();
    let db = Pool::new("host=localhost user=fiostv password=fiostv");
    let bgproute_initial_query = "SELECT * from multicast_bgproute";
    db.db_channel(tx1, bgproute_initial_query);
    let initial_query_results = rx1.recv().unwrap();
    let query = "SELECT ip_address, community FROM multicast_contentdevices WHERE sysname LIKE '%VCCR%' LIMIT 3";
    let mut agents = HashMap::new();
    db.db_channel(tx2, query);
    let query_results = rx2.recv().unwrap();

db.db_channel probably sends more than one message through the channel. You need to receive all of them before you can reuse the channel. Otherwise rx2.recv() will receive messages from the first db.db_channel.

Yes, this should work. For example, this simple program prints "hello" and then "world":

fn main() {
    let (tx1, rx1) = crossbeam_channel::unbounded();
    let (tx2, rx2) = (tx1.clone(), rx1.clone());
    
    tx1.send("hello").unwrap();
    dbg!(rx1.recv().unwrap());

    tx2.send("world").unwrap();
    dbg!(rx2.recv().unwrap());
}

(Playground)

Are you sure that db_channel is sending exactly one message?

Oh, it's not. It's sending all rows back from a database query. How does one make sure they are getting all of the data? Below is the code. I wonder if I should push the threads to a Vec?

 pub fn db_channel(&self, sender: flume::Sender<Vec<Row>>,
                          query: &'static str) {
            if query.starts_with("SELECT") {

                for _ in 0..20 {
                    let  pool = self.pool.clone();
                    let s = sender.clone();
                    let q = query.clone();
                    thread::spawn(move || {
                        let mut client = pool.get().unwrap();
                        let rows = client.query(q, &[]).unwrap();
                        if let Err(e) = s.send(rows) {
                            println!("Error in {}: {:?}", q, e);
                        }
                    });
                }
            }
        }

That would work, or you could use a method like Receiver::try_iter or drain to consume messages until the channel is empty.

If you don't actually want or need to consume all of the messages from one query before you start your next query, then creating a separate channel for each query is probably simplest.

Thanks! I'm going to try both then decide. And why not, I'll learn more this way :wink:

What I ended up with...

 while let Some(drain) = rx1.drain().next() {
        for item in drain {
            let route = Route {
                cd_id: item.get(5),
                prefix: item.get(1),
                net_mask: item.get(6),
                peer: item.get(2),
                age: item.get(3),
                last_seen: item.get(4),
                missing: item.get(7),
            };
            initial_hs.insert(route);
        }
    }

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.