Hello, everyone!
Once again i am in a dillema for choosing an appropriate solution for kind of a "simple" problem and i face problems with this type of pattern quiet often in stateful services.
Let' say we have a web service that exposes web socket as a stateful endpoint to web browsers/clients, we handle each connection with async fn
in tokio, which is okay that far, we can await for incomming messages from client and send responses in a simple loop {recv_send.await}
, but there are situations when we want to await not just on the clients web socket, but other channels/futures as well, for example a broadcast message channel or some internal service events and this is when writing async fn
becomes cumbersome.
Pseudocode for the way i would like to write this asynchronous websocket handler:
async fn ws_handler(socket, rx_broadcast, ..) {
loop {
match any_registered_future(web_socket, rx_broadcast, ..).await {
DataRecvdOnWebSocket => {
// parse users message, take appropriate action and maybe send
// response to websocket client.
// Maybe client sent us a command to subscribe to broadcast channel
// or some other channel, so we must be able to extend the list
// of any_registered_future.await call.
}
WebSocketClosed => {
// drop allocated resources for this connection, break the loop and
// close socket on our end as well.
}
DataReceivedOnBroadcast => {
// Send broadcast message data to websocket client. Maybe broadcast
// channel is closed, in such a case we just want to await on
// client's websocket and respond based on state.
}
}
}
}
It is possible to actually write a code like in "pseudocode" example i wrote above. I could create an mpsc::channel and spawn two 'async fn' per each connected client: so the main thread would listen on Receiver that provides it with enum of messages from asycn fn
that handles web socket only and holds Sender end. I could clone Sender and send it to broadcast thread. So all messages could be appended to single channel and code for handling them would be clean, readable, extensible (add as many producers as you like in future) and easier to reason about.
The thing i do not like with this approach is that, it increases latency and memory usage. Especially it feels wasteful if i need to await on two or three futures only. What solutions do you choose for these cases?
I've been experimenting with various approaches, like tokio::task::JoinSet
, futures::select!, futures::future::select, pure mpsc::channel and simle await on recv.
The problem with select
is that i must pass it pinned future, but to create pinned future i must call socket.recv()
where it takes &mut, thus once the future has been resolved i can not "recreate" it, because of two main reasons: pin_mut! shadows my variable, Rust compiler can not know statically that &mut for socket can be now used again. I don't know what i could do about pinning unless doing unsafe code (which i would like to avoid).
I like the select
in that respect that it is defined by using generics, thus the code should be fast, efficient and with little memory footprint.
Another idea i had was to write my own struct and implement Future for it and await on that, but the trouble is that socket.recv()
returns "anonymous future" (if that's correct terminology) and i can not store it in struct in a useful way excep through generics. Essentially i would need to store socket, a future returned from recv()/send() and somehow make self referential pin to it so that i can use it in poll, but... maybe my skill is lacking in this regard... Can someone give some idea or pointer? Is it even possible to achieve this?
A simplified code example i've been playing around (relevant parts must be uncommented in main):
#![allow(unused_imports)]
#![allow(dead_code)]
#![allow(unused_variables)]
use tokio::{
sync::mpsc::{
Receiver, Sender, channel,
},
time::{
sleep,
Duration
},
};
use futures::{
future::{
self,
FutureExt,
Either,
},
pin_mut,
select,
};
async fn some_async_sender(tx: Sender<usize>, num_iter: usize, sleep_millis: u64) {
// Use capacity as a "unique identifier" to distinguish from which thread
// the message is being sent.
let c = tx.capacity();
for _ in 0..num_iter {
// sleep to simulate "randomnes in time" for incomming data in main
// thread
sleep(Duration::from_millis(sleep_millis)).await;
if let Err(..) = tx.send(c).await { return }
}
// Simulate infinite blocking, so that main thread gets stuck if it is not
// able to handle each channel separateley.
sleep(Duration::from_millis(12000)).await;
}
#[tokio::main]
async fn main() {
let (tx_1, rx_1) = channel::<usize>(5);
let (tx_2, rx_2) = channel::<usize>(10);
tokio::spawn(some_async_sender(tx_1, 3, 500));
tokio::spawn(some_async_sender(tx_2, 5, 250));
println!("Main starts awaiting.");
blocking_handler(rx_1, rx_2).await;
// once_select_handler(rx_1, rx_2).await;
// loop_select_once_handler(rx_1, rx_2).await;
// I would like this to work or get it working.
// loop_select_generic(rx_1, rx_2).await;
}
// This will get stuck if one of channels does not send data, but does not get
// closed either [Just to demonstrate the problem, far from wanted solution].
async fn blocking_handler(mut rx_1: Receiver<usize>, mut rx_2: Receiver<usize>,) {
loop {
let r_1 = rx_1.recv().await;
let r_2 = rx_2.recv().await;
println!("got r_1: {:?}, r_2: {:?}", r_1, r_2);
if let (None, None) = (r_1, r_2) { break }
}
}
// The problem with this is that we handle single result only.
async fn select_once_handler(mut rx_1: Receiver<usize>, mut rx_2: Receiver<usize>,) {
let t1 = rx_1.recv().fuse();
let t2 = rx_2.recv().fuse();
pin_mut!(t1, t2);
select! {
val = t1 => println!("t1 val: {:?}", val),
val = t2 => println!("t2 val: {:?}", val),
}
}
// I am not sure about this one though, because we create 2 futures, but only
// "use" one of them, the second gets dropped, then in next iteration we do
// the same thing again.
// While it is working in this case, i kind of do not like to assume that it
// would be correct way to do things, because rx_1 is mutable and when i call
// rx_1.recv() i can not be 100% sure that it is fine to drop the created future
// without ever polling it.
// This has another problem in that once one channel is closed, we should awwait
// only on the channel that is not closed.
async fn loop_select_once_handler(mut rx_1: Receiver<usize>, mut rx_2: Receiver<usize>) {
loop {
let t1 = rx_1.recv().fuse();
let t2 = rx_2.recv().fuse();
pin_mut!(t1, t2);
select! {
val = t1 => println!("t1 val: {:?}", val),
val = t2 => println!("t2 val: {:?}", val),
}
}
}
// This would be the solution i would like to work, but it does not due to
// various problems.
// But what it has is, that it does not call recv() twice on the same channel,
// thus i would not have to worry if unused future has some impact on underlying
// data structures.
// async fn loop_select_generic(mut rx_1: Receiver<usize>, mut rx_2: Receiver<usize>) {
// use Either::*;
// let mut t1 = rx_1.recv();
// let mut t2 = rx_2.recv();
// loop {
// pin_mut!(t1, t2);
// match future::select(t1, t2).await {
// Left((v1, t2_unfinished)) => {
// t2 = t2_unfinished;
// t1 = rx_1.recv();
// }
// Right((v2, t1_unfinished)) => {
// t1 = t1_unfinished;
// t2 = rx_2.recv();
// }
// }
// }
// }