Tokio UDP send, then receive

I've working on a system that implements a small request/response protocol with UDP. The server uses Tokio, but I'm having trouble converting the initial version of the client, which used standard synchronous calls, to Tokio. Strictly speaking, the sequence is synchronous: the receive should happen only after a successful send. I thought using Tokio would help adding more complex behaviors, such as imposing a timeout, or retry, on a tardy response.

The code uses a UdpCodec trait and a UdpFramed socket. This is the codec:

impl UdpCodec for ClientCodec {
    type In = Result<SessResp, AccessError>;
    type Out = (SocketAddr, IpAddr);

    fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
        println!("receiving from {}", addr);
        Ok(match SessResp::from_msg(&buf) {
            Ok(recv_resp) => Ok(recv_resp),
            Err(e) => Err(e),
        })
    }

    fn encode(&mut self, (remote_addr, client_addr): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
        let msg = AccessReq::new(ReqType::TimedAccess, client_addr).to_msg();

        println!("sending");
        self.state.nonce.increment_le_inplace();
        into.extend(&self.state.nonce[..]);
        let encrypted_req_packet = box_::seal(&msg, &self.state.nonce,
                                              &self.key_data.peer_public,
                                              &self.key_data.secret);
        into.extend(encrypted_req_packet);
        remote_addr
    }
}

This is the code that uses the socket and codec

let codec = ClientCodec::new(state_filename, key_data_filename)?;
let sock = UdpSocket::bind(&bind_addr, &handle)
    .map_err(|e| { AccessError::IoError(e) })?;
let (framed_tx, framed_rx) = sock.framed(codec).split();

let send_req = framed_tx.send((remote_addr, client_addr));
Ok(drop(core.run(send_req)))

This works, but it's only the send part of the protocol. I tried numerous things to tack on a receive after the send. The simplest was:

let send_req = framed_tx.send((remote_addr, client_addr))
    .and_then(|_| { framed_rx.take(1) } );

This resulted in:

|
102 | .and_then(|_| { framed_rx.take(1) } );
| ^^^^^^^^ the trait futures::Future is not implemented for futures::stream::Take<futures::stream::SplitStream<tokio_core::net::UdpFramed<ClientCodec>>>
|
= note: required because of the requirements on the impl of futures::IntoFuture for futures::stream::Take<futures::stream::SplitStream<tokio_core::net::UdpFramed<ClientCodec>>>

error[E0277]: the trait bound futures::stream::Take<futures::stream::SplitStream<tokio_core::net::UdpFramed<ClientCodec>>>: futures::Future is not satisfied

I think this boils down to "take() does not return a future, and that's what needed here". I tried tacking on the into_future() call:

let send_req = framed_tx.send((remote_addr, client_addr))
    .and_then(|_| { framed_rx.take(1).into_future() } );

But that resulted in:

102 |         .and_then(|_| { framed_rx.take(1).into_future() } );
    |          ^^^^^^^^ expected tuple, found struct `std::io::Error`
    |
    = note: expected type `(std::io::Error, futures::stream::Take<futures::stream::SplitStream<tokio_core::net::UdpFramed<ClientCodec>>>)`
               found type `std::io::Error`

I tried a few other things, but this is a puzzle that I haven't been able to solve by experimenting and course-correcting based on hints from the error messages. What is the technique for reading a packet on successful completion of the send?

1 Like

The thing is that AndThen structure returned by and_then is expecting that the closure is returning a future with the same Error type than the caller:

impl<A, B, F> Future for AndThen<A, B, F> where
    A: Future,
    B: IntoFuture<Error = A::Error>,
    F: FnOnce(A::Item) -> B

Here, send returns a future with std::io::Error as the error type while the into_future() of a Stream has (std::io::Error, Stream) as the error type.

The solution would be to drop the stream from the error to match the two error types:

let send_req = framed_tx.send((remote_addr, client_addr))
    .and_then(|_| { framed_rx.take(1).into_future().map_err(|(e,_)| e) } );

Note: Streams will probably be closed in case of an error in the next major release of futures (see futures doc). So probably, into_future will not return the stream anymore with the error and the two error types will match. But it's unclear how the API will evolve though.

1 Like

Yes, that worked. Thanks.

Can you describe the procedure you used for figuring this out? Did the error message indicate the problem clearly to you or did you know what to do based on prior experience? Stringing together these futures with combinators has always been tricky and I wonder if there's some technique for getting it right without lots of flailing around.

Side question: if "send" returns a future, why doesn't take() return one? The read operation seems likely to incur a wait (and complete in the future).

1 Like

The error could be clearer but it simply means that there is a mismatch against two types. Forget about the terms 'expected' and 'found' types because it's not necessarily logical that one is expected more than the other, it's just that the compiler has to infer a type from somewhere first.
The thing is that it does not tell that it's specifically the Error type of the returned future that makes a problem. So you have to check the trait bounds or other type constraints you have when you encounter that error.
And yes, here that's my experience with futures that help to figure out quickly that's it's the Error type of the AndThen which is the issue here.

For your side question, you have to see a Stream like an async iterator (it's presented like this in the doc) where each value is a future.
So when you call take on an iterator, it returns you another iterator which contains the first n elements. It's the same for Stream. take just returns you another stream with the first elements.

It cannot just return a future (or just a value in case of the iterator) because it can return more than one element. You are calling it with its argument set to one, but it's not its only possible value :wink:

2 Likes

Another way to think of take is as an adapter, similar to how iterators have adapters. These adapters don't "initiate" any action on their own, but simply modify existing results as they're produced. In the case of take it just limits how many results (futures in this case) are produced, but it doesn't produce/initiate any on its own.

1 Like

Thank you both for the explanations. Having read those (and that document), it makes more sense. If I said take(5), then a "future" for all five of them wouldn't make sense. It really is an iterator like entity, returning a future for each element in the sequence.

Given all that, i was able to realize a further goal: adding a timeout on the response. This is the code:

let send_req = framed_tx.send((remote_addr, client_addr))
    .and_then(|_| { framed_rx.take(1).into_future().map_err(|(e,_)| e) })
    .select2(Timeout::new(Duration::from_secs(5), &handle).unwrap()
             .then(|t| { println!("no response from {}", remote_addr); t }));
1 Like