I am trying to take the stream of TcpStream
s produced by TcpStream::incoming()
and turn it into a stream of requests with metadata in order to make the handler logic request-oriented rather than connection-oriented.
My strategy right now is to create a new channel for each client and then flatten all those channels into a single stream. Each channel also needs to drive two separate futures for reading and writing data to the underlying TcpStream
, so I'm using take_until
on each client's stream. The handler in the select!
macro needs a channel to send responses back to the write_fut
, so the request sender also sends a clone of the response sender for each request.
However... the stream I'm currently producing doesn't satisfy FusedFuture
and so can't be used with select. I'm not sure why this is the case.
I'm also not confident I'm using take_until
the way it's intended.
Here's my simplified code:
use std::net::SocketAddr;
use futures::{channel::mpsc, future, AsyncReadExt, SinkExt, StreamExt};
use async_std::{
io::{ReadExt, WriteExt},
net::TcpListener,
task,
};
fn main() {
task::block_on(async { run_test().await });
}
async fn run_test() {
let tcp_listener = TcpListener::bind("127.0.0.1:8080".parse::<SocketAddr>().unwrap())
.await
.unwrap();
let service_stream = tcp_listener
.incoming()
.map(|stream| {
let stream = stream.unwrap();
let peer_addr = stream.peer_addr().unwrap();
let (reader, mut writer) = stream.split();
let (res_tx, mut res_rx) = mpsc::channel(0);
let (req_tx, req_rx) = mpsc::channel(0);
// Future for receiving requests from peer
let read_fut = async move {
let req_tx = req_tx.clone();
reader
.bytes()
.for_each(|req| {
let mut req_tx = req_tx.clone();
let res_tx = res_tx.clone();
async move {
req_tx.send((peer_addr, req, res_tx)).await.unwrap();
}
})
.await;
};
// Future for writing responses to peer
let write_fut = async move {
while let Some(byte) = res_rx.next().await {
writer.write_all(&[byte]).await.unwrap()
}
};
// Stream needs to drive both futures as it produces items
req_rx.take_until(future::join(read_fut, write_fut))
})
.flatten()
.fuse();
loop {
futures::select! {
(peer_addr, req, tx) = service_stream => {
println!("got request from: {peer_addr}");
tx.send(req).await.unwrap() // echo
}
// process another stream
}
}
}