I'm making a server using tokio. (This server.)
It's been pretty smooth sailing thus far. My inner per-client task is structured like this: (full, hideous code here)
/// Repeatedly uses `socket.read_until()` until a whole message is received
async fn get_message(socket: &mut BufReader<TcpStream>)
-> std::io::Result<Option<Message>>;
/// Repeatedly uses `socket.write_all()` until the whole message is sent
async fn send_response(socket: &mut BufReader<TcpStream>, response: Message>);
async fn inner_client(socket: TcpStream) -> std::io::Result<()> {
let socket = BufReader::new(socket);
loop {
let message = match get_message(&mut socket).await? {
Some(x) => x,
None => return Ok(()),
};
// handle the message, which might include:
send_response(&mut socket, some_response);
}
}
All well and good so far. But now I have a problem. Up until now, all server-to-client messages been in direct response to a client-to-server message. Now I've added a message type that requires a message to be sent to other connected clients, without them having to poll for it.
What I want is something like:
async fn inner_client(socket: TcpStream, oob_messages: Receiver<Message>)
-> std::io::Result<()> {
let socket = BufReader::new(socket);
loop {
tokio::select! {
message = get_message(&mut socket) => {
let message = match message {
Some(x) => x,
None => return Ok(()),
};
// handle the message, which might include:
send_response(&mut socket, some_response);
},
oob = oob_messages.recv() => {
// something like:
send_response(&mut socket, some_response);
},
}
}
}
But even without trying to compile that code, I can see a problem; it seems that if the oob_messages.recv()
branch is executed, then next time through the loop a new get_message
future would be started and the previous one discarded. This would cause problems... I think. (The real get_message
uses a buffer to hold an incomplete message, omitted from the above simplification, but I don't think that's enough to save it.)
I've tried structuring it so that the get_message
future is held onto "outside" of the loop, and recreated every time a message is received, but aside from being clunky and ugly, that creates a situation where socket
is mutably borrowed more than once, which is a huge no-no. Another solution I've considered is splitting the client task into three: one that reads messages from the client, one that reads out-of-band messages, and one that does select!
on an mpsc
channel written to by the others... but then won't I be in trouble if the "read messages" task is waiting on socket.read_until
and the select!
task tries to write? (Guarding the socket with a Mutex
would just make it deadlock there...) And then how do I cleanly break things off once the client has disconnected?
I'm just not sure how to proceed from here. This is my first serious attempt at using futures in any language, and I'm definitely not used to the pattern yet. Any help would be very much appreciated.