I've working on a system that implements a small request/response protocol with UDP. The server uses Tokio, but I'm having trouble converting the initial version of the client, which used standard synchronous calls, to Tokio. Strictly speaking, the sequence is synchronous: the receive should happen only after a successful send. I thought using Tokio would help adding more complex behaviors, such as imposing a timeout, or retry, on a tardy response.
The code uses a UdpCodec trait and a UdpFramed socket. This is the codec:
impl UdpCodec for ClientCodec {
type In = Result<SessResp, AccessError>;
type Out = (SocketAddr, IpAddr);
fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
println!("receiving from {}", addr);
Ok(match SessResp::from_msg(&buf) {
Ok(recv_resp) => Ok(recv_resp),
Err(e) => Err(e),
})
}
fn encode(&mut self, (remote_addr, client_addr): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
let msg = AccessReq::new(ReqType::TimedAccess, client_addr).to_msg();
println!("sending");
self.state.nonce.increment_le_inplace();
into.extend(&self.state.nonce[..]);
let encrypted_req_packet = box_::seal(&msg, &self.state.nonce,
&self.key_data.peer_public,
&self.key_data.secret);
into.extend(encrypted_req_packet);
remote_addr
}
}
This is the code that uses the socket and codec
let codec = ClientCodec::new(state_filename, key_data_filename)?;
let sock = UdpSocket::bind(&bind_addr, &handle)
.map_err(|e| { AccessError::IoError(e) })?;
let (framed_tx, framed_rx) = sock.framed(codec).split();
let send_req = framed_tx.send((remote_addr, client_addr));
Ok(drop(core.run(send_req)))
This works, but it's only the send part of the protocol. I tried numerous things to tack on a receive after the send. The simplest was:
let send_req = framed_tx.send((remote_addr, client_addr))
.and_then(|_| { framed_rx.take(1) } );
This resulted in:
|
102 | .and_then(|_| { framed_rx.take(1) } );
| ^^^^^^^^ the trait futures::Future
is not implemented for futures::stream::Take<futures::stream::SplitStream<tokio_core::net::UdpFramed<ClientCodec>>>
|
= note: required because of the requirements on the impl of futures::IntoFuture
for futures::stream::Take<futures::stream::SplitStream<tokio_core::net::UdpFramed<ClientCodec>>>
error[E0277]: the trait bound futures::stream::Take<futures::stream::SplitStream<tokio_core::net::UdpFramed<ClientCodec>>>: futures::Future
is not satisfied
I think this boils down to "take() does not return a future, and that's what needed here". I tried tacking on the into_future() call:
let send_req = framed_tx.send((remote_addr, client_addr))
.and_then(|_| { framed_rx.take(1).into_future() } );
But that resulted in:
102 | .and_then(|_| { framed_rx.take(1).into_future() } );
| ^^^^^^^^ expected tuple, found struct `std::io::Error`
|
= note: expected type `(std::io::Error, futures::stream::Take<futures::stream::SplitStream<tokio_core::net::UdpFramed<ClientCodec>>>)`
found type `std::io::Error`
I tried a few other things, but this is a puzzle that I haven't been able to solve by experimenting and course-correcting based on hints from the error messages. What is the technique for reading a packet on successful completion of the send?