Hi,
I have below code snipped using tokio::sync::oneshot and the rx is always closed before the tx can send the message, unexpectedly:
let (mut tx, rx) = tokio::sync::oneshot::channel::<Vec<Ipv4Addr>>();
let mut rx = rx.fuse();
let r = Arc::new(socket);
let s = r.clone();
let xid = msg.xid();
tokio::spawn(async move {
let mut buf = vec![0u8; 576];
let get_response = async move {
loop {
let (n_read, _) = r
.recv_from(&mut buf)
.await
.expect("failed to receive DHCP offer");
// fucking deep if-else hell
if let Ok(reply) = dhcproto::v4::Message::from_bytes(&buf[..n_read]) {
if let Some(op) = reply.opts().get(dhcproto::v4::OptionCode::MessageType) {
match op {
dhcproto::v4::DhcpOption::MessageType(msg_type) => {
if msg_type == &dhcproto::v4::MessageType::Offer {
if reply.xid() == xid {
if let Some(op) = reply
.opts()
.get(dhcproto::v4::OptionCode::DomainNameServer)
{
match op {
dhcproto::v4::DhcpOption::DomainNameServer(dns) => {
debug!("got NS servers {:?} from DHCP", dns);
return dns.clone();
}
_ => yield_now().await,
}
}
}
yield_now().await
}
}
_ => yield_now().await,
}
}
}
}
};
tokio::select! {
_ = tx.closed() => {debug!("rx closed, quiting")},
value = get_response => tx.send(value).expect("must send")
}
});
s.send_to(&msg.to_vec().expect("must encode"), "255.255.255.255:67")
.await?;
debug!("waiting for DHCP reply");
let mut interval = interval(Duration::from_millis(100));
let result = tokio::select! {
result = &mut rx => result.map_err(|x| io::Error::new(io::ErrorKind::Other, "channel error")),
// _ = interval.tick() => { debug!("Another 100ms"); return Err(io::Error::new(io::ErrorKind::Other, "DHCP timeout")) },
_ = tokio::time::sleep(Duration::from_secs(10)) => {
debug!("DHCP timeout after 10 secs");
return Err(io::Error::new(io::ErrorKind::Other, "dhcp timeout"));
}
};
debug!("rx dropped here");
result
the debug!("rx closed, quiting")
is always executed, however,
the debug!("rx dropped here");
is never executed, what could cause the rx to close? I don't see it's dropped due to out of lifetime.