UDP and tokio -- packets received but never sent

It's the time of the year when I get back to rust and try to implement something, to see if I'm happy with the state of the rust world yet. This time I'm stuck with UDP and tokio and any help would be appreciated.

I'm trying to re-write dhcp4r to use tokio (to bundle it with dns-server and iron). What I need in a nutshell is to receive all UDP packets and reply to some of those.

I tried combining a bunch of examples I've found online into something that works.. to some extent.

I've got a DHCPCodec:

struct DHCPCodec;

impl UdpCodec for DHCPCodec {
  type In = (SocketAddr, Vec<u8>);
  type Out = (SocketAddr, Vec<u8>);

  fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
    Ok((*addr, buf.to_vec()))
  }

  fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
    into.extend(buf);
    addr
  }
}

In a perfect world that'd decode and encode the actual dhcp pakets, but the Packet of dhcp4r doesn't support that yet (it's lifetime bound to source buffer) so I wanted an easy way.

My event loop:

pub fn run(h: Handle) -> Result<()> {
  let addr = "0.0.0.0:67"
    .parse()
    .chain_err(|| "cannot parse DHCPServer listen addr")?;
  let sock = UdpSocket::bind(&addr, &h).chain_err(|| "cannot bind udp socket")?;

  let (sink, stream) = sock.framed(DHCPCodec).split();

  let reader = stream
    .map(|(addr, msg)| match packet::decode(&msg) {
      Ok(p) => DHCPServer::handle_request(addr, p),
      Err(e) => {
        error!("failed to parse dhcp packet from {}: {}", addr, e);
        None
      }
    })
    .filter(|ret| ret.is_some())
    .map(|ret| ret.unwrap());

  let writer = sink.send_all(reader);

  h.spawn(writer.then(|_| Ok(())));

  Ok(())
}

I do a map().filter().map() thing here because I only need to reply to some packets, not all of them. Is there any way to do in better? I really don't like how unwrap() sits there being totally useless.

Now, to the actual problem. This code doesn't work :slight_smile:

Here's what I see in the logs:

DEBUG tokio_core::reactor                     > loop poll - Duration { secs: 2, nanos: 903952464 }
DEBUG tokio_core::reactor                     > loop time - Instant { tv_sec: 1582, tv_nsec: 967097264 }
TRACE tokio_core::reactor                     > event Readable | Writable Token(6)
DEBUG tokio_core::reactor                     > notifying a task handle
DEBUG tokio_core::reactor                     > loop process - 1 events, Duration { secs: 0, nanos: 704341 }
DEBUG tokio_core::reactor                     > loop poll - Duration { secs: 0, nanos: 16493 }
DEBUG tokio_core::reactor                     > loop time - Instant { tv_sec: 1582, tv_nsec: 967930848 }
TRACE tokio_core::reactor                     > event Readable Token(7)
TRACE tokio_core::net::udp::frame             > received 300 bytes, decoding
TRACE tokio_core::net::udp::frame             > frame decoded from buffer
DEBUG mdserver::dhcp                          > dhcp discover from 0.0.0.0:68
INFO  mdserver::dhcp                          > offering (V4(255.255.255.255:68), [...])
INFO  mdserver::dhcp                          > filter Some((V4(255.255.255.255:68), [...]))
TRACE tokio_core::net::udp::frame             > sending frame
TRACE tokio_core::net::udp::frame             > frame encoded; length=278
DEBUG tokio_core::reactor                     > scheduling direction for: 2
DEBUG tokio_core::reactor                     > blocking
TRACE tokio_core::net::udp::frame             > flushing frame; length=278
DEBUG tokio_core::reactor                     > dropping I/O source: 2
DEBUG tokio_core::reactor                     > loop process - 1 events, Duration { secs: 0, nanos: 5929916 }
DEBUG tokio_core::reactor                     > loop poll - Duration { secs: 0, nanos: 12576 }
DEBUG tokio_core::reactor                     > loop time - Instant { tv_sec: 1582, tv_nsec: 974023526 }
DEBUG tokio_core::reactor                     > loop process - 0 events, Duration { secs: 0, nanos: 184472 } 

The outgoing packet never leaves the interface, as far as tcpdump is concerned. More so, the server ignores all the further requests (if I return None from DHCPServer::handle_request then it at least keeps receiving the packets).

What's happening in here? How should I rewrite my event loop to make it work?

1 Like

You can use Stream::filter_map (link) to do the filtering and mapping in 1 place.

How are you running the Core itself (i.e. Core::run)? What you've shown in this post is spawning a "background" task onto the reactor.

Sure, the core is running in the very end of my main:

core.run(futures::future::empty::<(), ()>()).unwrap();

and it reliably serves http requests.

Ok cool, just checking :slight_smile:

h.spawn(writer.then(|_| Ok(())));

The closure to then will silently eat an error. I’m assuming you’ve verified that writer chain isn’t erroring with anything?

1 Like

Ah right, thanks!

sendto(8, "\2\1\6\0\362\203\"/\0\0\0\0\0\0\0\0\n\0\0c\0\0\0\0\0\0\0\0\10\0'\301"..., 278, MSG_NOSIGNAL, {sa_family=AF_INET, sin_port=htons(68), sin_addr=inet_addr("255.255.255.255")}, 16) = -1 EACCES (Permission denied)

I'll go refresh my knowledge on UDP then.

1 Like

You probably need to enable SO_BROADCAST via tokio_core::net::UdpSocket - Rust.

Also, yeah - don't eat errors, at least log them :slight_smile:. The EACCES should've surfaced in the Result given to your then closure.

Yep, it was the missing SO_BROADCAST. Now I only need to figure why rust receives packets from eth1 and replies to eth0. Anyways, that's not a tokio problem.