Yet another ignorant `futures` beginner


#1

Hi,

This is my first try with futures:

struct EndmiCodec;

impl UdpCodec for EndmiCodec {
    type In = (SocketAddr, endmi::Message);
    type Out = (SocketAddr, endmi::Message);
    fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> std::io::Result<Self::In> {
        match endmi::Message::decode(buf) {
            Ok(msg) => Ok((*addr, msg)),
            Err(e) => Err(std::io::Error::from(std::io::ErrorKind::InvalidData)),
        }
    }

    fn encode(&mut self, (addr, msg): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
        into.extend(msg.encode());
        addr
    }
}

main!(|args: Cli, log_level: verbosity| {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), endmi::PORT);
    let listener = UdpSocket::bind(&addr, &handle).unwrap();
    listener.set_broadcast(true);
    info!("listening on {}", addr);

    let (sink, stream) = listener.framed(EndmiCodec).split();

    let server = sink.send((addr, endmi::Message::WhosThere)).and_then(
        stream.for_each(move |(addr, msg)| {
            println!("received: {:?} from {}", msg, addr);
            Ok(())
        }),
    );

    core.run(server).unwrap();
});

This doesn’t compile, with the extra-long errors I enjoyed reading on other poor souls’ request for help:

error[E0277]: the trait bound `futures::stream::ForEach<futures::stream::SplitStream<tokio_core::net::UdpFramed<EndmiCodec>>, [closure@src/main.rs:60:25: 63:10], std::result::Result<(), std::io::Error>>: std::ops::FnOnce<(futures::stream::SplitSink<tokio_core::net::UdpFramed<EndmiCodec>>,)>` is not satisfied
  --> src/main.rs:59:63
   |
59 |     let server = sink.send((addr, endmi::Message::WhosThere)).and_then(
   |                                                               ^^^^^^^^ the trait `std::ops::FnOnce<(futures::stream::SplitSink<tokio_core::net::UdpFramed<EndmiCodec>>,)>` is not implemented for `futures::stream::ForEach<futures::stream::SplitStream<tokio_core::net::UdpFramed<EndmiCodec>>, [closure@src/main.rs:60:25: 63:10], std::result::Result<(), std::io::Error>>`

error[E0277]: the trait bound `futures::stream::ForEach<futures::stream::SplitStream<tokio_core::net::UdpFramed<EndmiCodec>>, [closure@src/main.rs:60:25: 63:10], std::result::Result<(), std::io::Error>>: std::ops::FnOnce<(futures::stream::SplitSink<tokio_core::net::UdpFramed<EndmiCodec>>,)>` is not satisfied
  --> src/main.rs:66:10
   |
66 |     core.run(server).unwrap();
   |          ^^^ the trait `std::ops::FnOnce<(futures::stream::SplitSink<tokio_core::net::UdpFramed<EndmiCodec>>,)>` is not implemented for `futures::stream::ForEach<futures::stream::SplitStream<tokio_core::net::UdpFramed<EndmiCodec>>, [closure@src/main.rs:60:25: 63:10], std::result::Result<(), std::io::Error>>`
   |
   = note: required because of the requirements on the impl of `futures::Future` for `futures::AndThen<futures::sink::Send<futures::stream::SplitSink<tokio_core::net::UdpFramed<EndmiCodec>>>, _, futures::stream::ForEach<futures::stream::SplitStream<tokio_core::net::UdpFramed<EndmiCodec>>, [closure@src/main.rs:60:25: 63:10], std::result::Result<(), std::io::Error>>>`

error: aborting due to 2 previous errors

What am I doing wrong here ?
Bonus question: why is rustfmt inserting a comma after stream.for_each(...) ?


#2
let server = sink.send((addr, endmi::Message::WhosThere)).and_then(

You probably intended for and_then to have a closure (ie and_then(|s| ...)), rather than try to return the future there as the closure :slight_smile:


#3

Plain and simple. Thanks @vitalyd (I think you’re the single most helpful guy on this forum).