tokio::sync::oneshot::Receiver closed unexpectedly

Hi,

I have below code snipped using tokio::sync::oneshot and the rx is always closed before the tx can send the message, unexpectedly:

 let (mut tx, rx) = tokio::sync::oneshot::channel::<Vec<Ipv4Addr>>();

    let mut rx = rx.fuse();

    let r = Arc::new(socket);
    let s = r.clone();

    let xid = msg.xid();
    tokio::spawn(async move {
        let mut buf = vec![0u8; 576];

        let get_response = async move {
            loop {
                let (n_read, _) = r
                    .recv_from(&mut buf)
                    .await
                    .expect("failed to receive DHCP offer");

                // fucking deep if-else hell
                if let Ok(reply) = dhcproto::v4::Message::from_bytes(&buf[..n_read]) {
                    if let Some(op) = reply.opts().get(dhcproto::v4::OptionCode::MessageType) {
                        match op {
                            dhcproto::v4::DhcpOption::MessageType(msg_type) => {
                                if msg_type == &dhcproto::v4::MessageType::Offer {
                                    if reply.xid() == xid {
                                        if let Some(op) = reply
                                            .opts()
                                            .get(dhcproto::v4::OptionCode::DomainNameServer)
                                        {
                                            match op {
                                                dhcproto::v4::DhcpOption::DomainNameServer(dns) => {
                                                    debug!("got NS servers {:?} from DHCP", dns);
                                                    return dns.clone();
                                                }
                                                _ => yield_now().await,
                                            }
                                        }
                                    }
                                    yield_now().await
                                }
                            }
                            _ => yield_now().await,
                        }
                    }
                }
            }
        };

        tokio::select! {
            _ = tx.closed() => {debug!("rx closed, quiting")},
            value = get_response => tx.send(value).expect("must send")
        }
    });

    s.send_to(&msg.to_vec().expect("must encode"), "255.255.255.255:67")
        .await?;

    debug!("waiting for DHCP reply");

    let mut interval = interval(Duration::from_millis(100));

    let result = tokio::select! {
        result = &mut rx => result.map_err(|x| io::Error::new(io::ErrorKind::Other, "channel error")),
        // _ = interval.tick() => { debug!("Another 100ms"); return Err(io::Error::new(io::ErrorKind::Other, "DHCP timeout")) },
        _ = tokio::time::sleep(Duration::from_secs(10)) => {
            debug!("DHCP timeout after 10 secs");
         return Err(io::Error::new(io::ErrorKind::Other, "dhcp timeout"));
            }

    };

    debug!("rx dropped here");

    result

the debug!("rx closed, quiting") is always executed, however,

the debug!("rx dropped here"); is never executed, what could cause the rx to close? I don't see it's dropped due to out of lifetime.

for whom experiencing similar issue:

this one is due to an outer select! cancelled the future.

I guess it's an obvious answer for seasoned users but a bit hard to realize at the first place.

further reading: https://www.reddit.com/r/rust/comments/y4m6bb/comment/isf48h8/?utm_source=share&utm_medium=web2x&context=3

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.